This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 233ebd8e2bc [Dataflow Streaming] Add a job setting to limit value size
in windmill state cache (#38458)
233ebd8e2bc is described below
commit 233ebd8e2bc3c7041a3325fd1a6018c88631a940
Author: Arun Pandian <[email protected]>
AuthorDate: Mon May 18 01:52:33 2026 -0700
[Dataflow Streaming] Add a job setting to limit value size in windmill
state cache (#38458)
* Add key, value size distribution to state cache stats
* Add max value size limit to status page
---
.../options/DataflowStreamingPipelineOptions.java | 10 ++
.../dataflow/worker/StreamingDataflowWorker.java | 17 ++-
.../dataflow/worker/util/SimpleByteHistogram.java | 43 ++++++++
.../worker/windmill/state/WindmillStateCache.java | 122 +++++++++++++++++----
.../worker/util/SimpleByteHistogramTest.java | 51 +++++++++
.../windmill/state/WindmillStateCacheTest.java | 62 +++++++++++
.../worker/windmill/src/main/proto/windmill.proto | 2 +
7 files changed, 285 insertions(+), 22 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
index 9727048e47a..90375ad445a 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
@@ -249,6 +249,16 @@ public interface DataflowStreamingPipelineOptions extends
PipelineOptions {
void setIsWindmillServiceDirectPathEnabled(boolean
isWindmillServiceDirectPathEnabled);
+ /**
+ * The maximum size of cached entries in bytes. Entries (eg: values, bags)
larger than this limit
+ * will not be cached by the windmill state cache
+ */
+ @Description("The maximum size of cached entries in bytes.")
+ @Default.Long(Long.MAX_VALUE)
+ Long getMaxWindmillStateCacheEntryBytes();
+
+ void setMaxWindmillStateCacheEntryBytes(Long value);
+
/**
* Factory for creating local Windmill address. Reads from system propery
'windmill.hostport' for
* backwards compatibility.
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index f5e5adab155..4d070da995b 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -50,6 +50,7 @@ import
org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
import org.apache.beam.runners.dataflow.worker.streaming.WeightedSemaphore;
import
org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor;
import
org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig.Fetcher;
import
org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle;
import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingApplianceComputationConfigFetcher;
import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEngineComputationConfigFetcher;
@@ -113,6 +114,7 @@ import org.apache.beam.sdk.fn.JvmInitializers;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.util.construction.CoderTranslation;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.auth.MoreCallCredentials;
@@ -633,6 +635,10 @@ public final class StreamingDataflowWorker {
WindmillStateCache.builder()
.setSizeMb(options.getWorkerCacheMb())
.setSupportMapViaMultimap(options.isEnableStreamingEngine())
+
.setMaxCachedEntryBytes(options.getMaxWindmillStateCacheEntryBytes())
+ .setEnableHistogram(
+ !ExperimentalOptions.hasExperiment(
+ options, "disable_windmill_user_state_cache_histogram"))
.build();
GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder =
@@ -651,6 +657,15 @@ public final class StreamingDataflowWorker {
windmillStateCache::forComputation,
ID_GENERATOR));
+ Fetcher configFetcher =
configFetcherComputationStateCacheAndWindmillClient.configFetcher();
+ configFetcher
+ .getGlobalConfigHandle()
+ .registerConfigObserver(
+ config -> {
+ windmillStateCache.setMaxCachedEntryBytesOverride(
+ config.userWorkerJobSettings().getMaxCachedEntryBytes());
+ });
+
ComputationStateCache computationStateCache =
configFetcherComputationStateCacheAndWindmillClient.computationStateCache();
WindmillServerStub windmillServer =
@@ -689,7 +704,7 @@ public final class StreamingDataflowWorker {
return new StreamingDataflowWorker(
windmillServer,
clientId,
- configFetcherComputationStateCacheAndWindmillClient.configFetcher(),
+ configFetcher,
computationStateCache,
windmillStateCache,
workExecutor,
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/SimpleByteHistogram.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/SimpleByteHistogram.java
new file mode 100644
index 00000000000..6b90ca8df9e
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/SimpleByteHistogram.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.util;
+
+/** A simple histogram to track byte sizes. */
+public class SimpleByteHistogram {
+ private final long[] buckets = new long[7];
+
+ public void add(long weight) {
+ buckets[getBucket(weight)]++;
+ }
+
+ private int getBucket(long weight) {
+ if (weight < 128) return 0;
+ if (weight < 256) return 1;
+ if (weight < 512) return 2;
+ if (weight < 1024) return 3;
+ if (weight < 10 * 1024) return 4;
+ if (weight < 1024 * 1024) return 5;
+ return 6;
+ }
+
+ public String format() {
+ return String.format(
+ "[<128B:%d, <256B:%d, <512B:%d, <1KB:%d, <10KB:%d, <1MB:%d, >=1MB:%d]",
+ buckets[0], buckets[1], buckets[2], buckets[3], buckets[4],
buckets[5], buckets[6]);
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
index 07c9599c866..7515db00085 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
@@ -32,6 +32,7 @@ import org.apache.beam.runners.dataflow.worker.*;
import org.apache.beam.runners.dataflow.worker.status.BaseStatusServlet;
import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey;
+import org.apache.beam.runners.dataflow.worker.util.SimpleByteHistogram;
import
org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.util.Weighted;
@@ -75,9 +76,18 @@ public class WindmillStateCache implements
StatusDataProvider {
private final ConcurrentMap<WindmillComputationKey, ForKey> keyIndex;
private final long workerCacheBytes; // Copy workerCacheMb and convert to
bytes.
private final boolean supportMapViaMultimap;
-
- WindmillStateCache(long sizeMb, boolean supportMapViaMultimap) {
+ private final long defaultMaxCachedEntryBytes;
+ private final boolean enableHistogram;
+ private volatile long maxCachedEntryBytesOverride = -1L;
+
+ WindmillStateCache(
+ long sizeMb,
+ boolean supportMapViaMultimap,
+ long maxCachedEntryBytes,
+ boolean enableHistogram) {
this.workerCacheBytes = sizeMb * MEGABYTES;
+ this.defaultMaxCachedEntryBytes = maxCachedEntryBytes;
+ this.enableHistogram = enableHistogram;
int stateCacheConcurrencyLevel =
Math.max(STATE_CACHE_CONCURRENCY_LEVEL,
Runtime.getRuntime().availableProcessors());
this.stateCache =
@@ -99,11 +109,27 @@ public class WindmillStateCache implements
StatusDataProvider {
Builder setSupportMapViaMultimap(boolean supportMapViaMultimap);
+ Builder setMaxCachedEntryBytes(long maxCachedEntryBytes);
+
+ Builder setEnableHistogram(boolean enableHistogram);
+
WindmillStateCache build();
}
public static Builder builder() {
- return new
AutoBuilder_WindmillStateCache_Builder().setSupportMapViaMultimap(false);
+ return new AutoBuilder_WindmillStateCache_Builder()
+ .setSupportMapViaMultimap(false)
+ .setMaxCachedEntryBytes(Long.MAX_VALUE)
+ .setEnableHistogram(true);
+ }
+
+ public void setMaxCachedEntryBytesOverride(long limit) {
+ this.maxCachedEntryBytesOverride = limit;
+ }
+
+ private long getMaxCachedEntryBytesLimit() {
+ long override = maxCachedEntryBytesOverride;
+ return override >= 0 ? override : defaultMaxCachedEntryBytes;
}
private EntryStats calculateEntryStats() {
@@ -111,10 +137,20 @@ public class WindmillStateCache implements
StatusDataProvider {
BiConsumer<StateId, StateCacheEntry> consumer =
(stateId, stateCacheEntry) -> {
stats.entries++;
- stats.idWeight += stateId.getWeight();
- stats.entryWeight += stateCacheEntry.getWeight();
+ long idWeight = stateId.getWeight();
+ stats.idWeight += idWeight;
+ long entryWeight = stateCacheEntry.getWeight();
+ stats.entryWeight += entryWeight;
stats.entryValues += stateCacheEntry.values.size();
stats.maxEntryValues = Math.max(stats.maxEntryValues,
stateCacheEntry.values.size());
+ if (enableHistogram) {
+ stats.addKeyWeight(idWeight);
+ stats.addEntryWeight(entryWeight);
+ stateCacheEntry.values.forEach(
+ (encodedAddress, weightedValue) -> {
+ stats.addValueWeight(weightedValue.weight);
+ });
+ }
};
stateCache.asMap().forEach(consumer);
return stats;
@@ -142,23 +178,44 @@ public class WindmillStateCache implements
StatusDataProvider {
@Override
public void appendSummaryHtml(PrintWriter response) {
response.println("Cache Stats: <br><table>");
- response.println(
- "<tr><th>Hit Ratio</th><th>Evictions</th><th>Entries</th>"
- + "<th>Entry Values</th><th>Max Entry Values</th>"
- + "<th>Id Weight</th><th>Entry Weight</th><th>Max
Weight</th><th>Keys</th>"
- + "</tr><tr>");
CacheStats cacheStats = stateCache.stats();
EntryStats entryStats = calculateEntryStats();
- response.println("<td>" + cacheStats.hitRate() + "</td>");
- response.println("<td>" + cacheStats.evictionCount() + "</td>");
- response.println("<td>" + entryStats.entries + "(" + stateCache.size() + "
inc. weak) </td>");
- response.println("<td>" + entryStats.entryValues + "</td>");
- response.println("<td>" + entryStats.maxEntryValues + "</td>");
- response.println("<td>" + entryStats.idWeight / MEGABYTES + "MB</td>");
- response.println("<td>" + entryStats.entryWeight / MEGABYTES + "MB</td>");
- response.println("<td>" + getMaxWeight() / MEGABYTES + "MB</td>");
- response.println("<td>" + keyIndex.size() + "</td>");
- response.println("</tr></table><br>");
+
+ response.println("<tr><th>Hit Ratio</th><td>" + cacheStats.hitRate() +
"</td></tr>");
+ response.println("<tr><th>Evictions</th><td>" + cacheStats.evictionCount()
+ "</td></tr>");
+ response.println(
+ "<tr><th>Entries</th><td>"
+ + entryStats.entries
+ + " ("
+ + stateCache.size()
+ + " inc. weak)</td></tr>");
+ response.println("<tr><th>Entry Values</th><td>" + entryStats.entryValues
+ "</td></tr>");
+ response.println(
+ "<tr><th>Max Entry Values</th><td>" + entryStats.maxEntryValues +
"</td></tr>");
+ response.println(
+ "<tr><th>Id Weight</th><td>" + entryStats.idWeight / MEGABYTES +
"MB</td></tr>");
+ response.println(
+ "<tr><th>Entry Weight</th><td>" + entryStats.entryWeight / MEGABYTES +
"MB</td></tr>");
+ response.println("<tr><th>Max Weight</th><td>" + getMaxWeight() /
MEGABYTES + "MB</td></tr>");
+ response.println("<tr><th>Keys</th><td>" + keyIndex.size() + "</td></tr>");
+ response.println(
+ "<tr><th>Entry Size Limit</th><td>" + getMaxCachedEntryBytesLimit() +
" bytes</td></tr>");
+ if (enableHistogram) {
+ response.println(
+ "<tr><th>Entry Weight Dist</th><td>"
+ + entryStats.entryWeightHistogram.format()
+ + "</td></tr>");
+ response.println(
+ "<tr><th>Value Weight Dist</th><td>"
+ + entryStats.valueWeightHistogram.format()
+ + "</td></tr>");
+ response.println(
+ "<tr><th>Key Weight Dist</th><td>"
+ + entryStats.keyWeightHistogram.format()
+ + "</td></tr>");
+ }
+
+ response.println("</table><br>");
}
public BaseStatusServlet statusServlet() {
@@ -180,6 +237,21 @@ public class WindmillStateCache implements
StatusDataProvider {
long entryWeight;
long entryValues;
long maxEntryValues;
+ SimpleByteHistogram entryWeightHistogram = new SimpleByteHistogram();
+ SimpleByteHistogram valueWeightHistogram = new SimpleByteHistogram();
+ SimpleByteHistogram keyWeightHistogram = new SimpleByteHistogram();
+
+ void addEntryWeight(long weight) {
+ entryWeightHistogram.add(weight);
+ }
+
+ void addValueWeight(long weight) {
+ valueWeightHistogram.add(weight);
+ }
+
+ void addKeyWeight(long weight) {
+ keyWeightHistogram.add(weight);
+ }
}
/**
@@ -413,7 +485,15 @@ public class WindmillStateCache implements
StatusDataProvider {
}
public void persist() {
- localCache.forEach(stateCache::put);
+ long limit = WindmillStateCache.this.getMaxCachedEntryBytesLimit();
+ localCache.forEach(
+ (id, entry) -> {
+ if (entry.getWeight() <= limit) {
+ stateCache.put(id, entry);
+ } else {
+ stateCache.invalidate(id);
+ }
+ });
}
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/SimpleByteHistogramTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/SimpleByteHistogramTest.java
new file mode 100644
index 00000000000..252300a1955
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/SimpleByteHistogramTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link SimpleByteHistogram}. */
+@RunWith(JUnit4.class)
+public class SimpleByteHistogramTest {
+
+ @Test
+ public void testHistogram() {
+ SimpleByteHistogram histogram = new SimpleByteHistogram();
+ histogram.add(10); // <128B
+ histogram.add(127); // <128B
+ histogram.add(128); // <256B
+ histogram.add(255); // <256B
+ histogram.add(256); // <512B
+ histogram.add(511); // <512B
+ histogram.add(512); // <1KB
+ histogram.add(1023); // <1KB
+ histogram.add(1024); // <10KB
+ histogram.add(10240 - 1); // <10KB
+ histogram.add(10240); // <1MB
+ histogram.add(1048576 - 1); // <1MB
+ histogram.add(1048576); // >=1MB
+ histogram.add(2000000); // >=1MB
+
+ String expected = "[<128B:2, <256B:2, <512B:2, <1KB:2, <10KB:2, <1MB:2,
>=1MB:2]";
+ assertEquals(expected, histogram.format());
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java
index bbb8e4c93c0..2d3d9b5ccff 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java
@@ -270,6 +270,68 @@ public class WindmillStateCacheTest {
assertEquals(400 * MEGABYTES, cache.getMaxWeight());
}
+ @Test
+ public void testMaxCachedEntryBytes() throws Exception {
+ cache.setMaxCachedEntryBytesOverride(
+ 100); // Set limit to 100 bytes, per cache entry overhead is 136.
+
+ WindmillStateCache.ForKeyAndFamily keyCache =
+ cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L,
1L).forFamily(STATE_FAMILY);
+
+ TestStateTag tag1 = new TestStateTag("tag1");
+ TestStateTag tag2 = new TestStateTag("tag2");
+
+ putInCache(keyCache, StateNamespaces.global(), tag1, new TestState("g1"),
10);
+ keyCache.persist();
+
+ // It should not be in global cache because it's too large.
+ keyCache =
+ cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L,
2L).forFamily(STATE_FAMILY);
+ assertEquals(Optional.empty(), getFromCache(keyCache,
StateNamespaces.global(), tag1));
+
+ // Now set limit larger.
+ cache.setMaxCachedEntryBytesOverride(1000);
+
+ putInCache(keyCache, StateNamespaces.global(), tag2, new TestState("g2"),
10);
+ keyCache.persist();
+
+ // It should be in global cache.
+ keyCache =
+ cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L,
3L).forFamily(STATE_FAMILY);
+ assertEquals(
+ Optional.of(new TestState("g2")), getFromCache(keyCache,
StateNamespaces.global(), tag2));
+
+ // Now update it to be larger than limit.
+ putInCache(keyCache, StateNamespaces.global(), tag2, new
TestState("g2_large"), 2000);
+ keyCache.persist();
+
+ // It should be removed from global cache.
+ keyCache =
+ cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L,
4L).forFamily(STATE_FAMILY);
+ assertEquals(Optional.empty(), getFromCache(keyCache,
StateNamespaces.global(), tag2));
+ }
+
+ @Test
+ public void testDisableHistogram() throws Exception {
+ WindmillStateCache noHistogramCache =
+
WindmillStateCache.builder().setSizeMb(400).setEnableHistogram(false).build();
+ WindmillStateCache.ForKeyAndFamily keyCache =
+ noHistogramCache
+ .forComputation(COMPUTATION)
+ .forKey(COMPUTATION_KEY, 0L, 1L)
+ .forFamily(STATE_FAMILY);
+
+ putInCache(
+ keyCache, StateNamespaces.global(), new TestStateTag("tag1"), new
TestState("g1"), 2);
+ keyCache.persist();
+
+ java.io.StringWriter writer = new java.io.StringWriter();
+ noHistogramCache.appendSummaryHtml(new java.io.PrintWriter(writer));
+ String summary = writer.toString();
+
+ org.junit.Assert.assertFalse(summary.contains("Entry Weight Dist"));
+ }
+
/** Verifies that values are cached in the appropriate namespaces. */
@Test
public void testInvalidation() throws Exception {
diff --git
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
index b7579cbacb8..58e4f7df3c3 100644
---
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
+++
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
@@ -986,6 +986,8 @@ message UserWorkerRunnerV1Settings {
optional ConnectivityType connectivity_type = 4
[default = CONNECTIVITY_TYPE_DEFAULT];
+ optional int64 max_cached_entry_bytes = 5 [default = -1];
+
reserved 1, 2;
}