scwhittle commented on code in PR #38458:
URL: https://github.com/apache/beam/pull/38458#discussion_r3233345938
##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java:
##########
@@ -249,6 +249,16 @@ public interface DataflowStreamingPipelineOptions extends
PipelineOptions {
void setIsWindmillServiceDirectPathEnabled(boolean
isWindmillServiceDirectPathEnabled);
+ /**
+ * The maximum size of cached values in bytes. Values larger than this limit
will not be cached by
Review Comment:
values is a little confusing with ValueState. I believe this applies to
other state types. Would it be better to have separate values for different
state types instead? We could for example choose to cache large value states
but not large bags.
I think side inputs is a separate cache but if not we should likely exclude
those from this limit.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java:
##########
@@ -99,22 +108,48 @@ public interface Builder {
Builder setSupportMapViaMultimap(boolean supportMapViaMultimap);
+ Builder setMaxCachedValueBytes(long maxCachedValueBytes);
+
+ Builder setEnableHistogram(boolean enableHistogram);
+
WindmillStateCache build();
}
public static Builder builder() {
- return new
AutoBuilder_WindmillStateCache_Builder().setSupportMapViaMultimap(false);
+ return new AutoBuilder_WindmillStateCache_Builder()
+ .setSupportMapViaMultimap(false)
+ .setMaxCachedValueBytes(Long.MAX_VALUE)
+ .setEnableHistogram(true);
+ }
+
+ public void setMaxCachedValueBytesOverride(long limit) {
+ this.maxCachedValueBytesOverride = limit;
+ }
+
+ private long getMaxCachedValueBytesLimit() {
+ long override = maxCachedValueBytesOverride;
+ return override >= 0 ? override : defaultMaxCachedValueBytes;
}
private EntryStats calculateEntryStats() {
EntryStats stats = new EntryStats();
BiConsumer<StateId, StateCacheEntry> consumer =
(stateId, stateCacheEntry) -> {
stats.entries++;
- stats.idWeight += stateId.getWeight();
- stats.entryWeight += stateCacheEntry.getWeight();
+ long idWeight = stateId.getWeight();
+ stats.idWeight += idWeight;
+ long entryWeight = stateCacheEntry.getWeight();
+ stats.entryWeight += entryWeight;
stats.entryValues += stateCacheEntry.values.size();
stats.maxEntryValues = Math.max(stats.maxEntryValues,
stateCacheEntry.values.size());
+ if (enableHistogram) {
+ stats.addKeyWeight(idWeight);
+ stats.addEntryWeight(entryWeight);
+ stateCacheEntry.values.forEach(
+ (encodedAddress, weightedValue) -> {
Review Comment:
This is on the order of tags within a key, correct? It seems like that there
will more likely be more stateCacheEntry to iterate over than
stateCacheEntry.values so I don't think it is worth precalculating since we
arlready iterate over stateCacheEntrys
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java:
##########
@@ -180,6 +248,31 @@ private static class EntryStats {
long entryWeight;
long entryValues;
long maxEntryValues;
+ long[] entryWeightHistogram = new long[7];
+ long[] valueWeightHistogram = new long[7];
+ long[] keyWeightHistogram = new long[7];
+
+ void addEntryWeight(long weight) {
+ entryWeightHistogram[getBucket(weight)]++;
+ }
+
+ void addValueWeight(long weight) {
+ valueWeightHistogram[getBucket(weight)]++;
+ }
+
+ void addKeyWeight(long weight) {
+ keyWeightHistogram[getBucket(weight)]++;
+ }
+
+ private int getBucket(long weight) {
Review Comment:
create a SimpleByteHistogram class? The buckets here and the status
rendering woudl be better grouped in a class.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]