This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c3b3fa62c3a Include byte size of stateKey in estimated weight of 
WindmillBag, WindmillValue, and WindmillWatermarkHold (#30654)
c3b3fa62c3a is described below

commit c3b3fa62c3a323e8da15a18aeba4a43b628efd24
Author: dmitryor <34167644+dmitr...@users.noreply.github.com>
AuthorDate: Mon Mar 18 11:43:27 2024 -0700

    Include byte size of stateKey in estimated weight of WindmillBag, 
WindmillValue, and WindmillWatermarkHold (#30654)
    
    * Update WindmillBag.java
    
    Include byte size of the stateKey on the BagState weight used to estimate 
and limit the total state cache size
    
    * Update WindmillValue.java
    
    Include stateKey size in the byte size of a WidnmillValue
    
    * Update WindmillWatermarkHold.java
    
    Include keyState size in the WatermarkHold estimated byte size
    
    * Fix formatting issue
    
    * Fix expected cache item weights in WindmillStateInternalsTest
---
 .../dataflow/worker/windmill/state/WindmillBag.java    |  2 +-
 .../dataflow/worker/windmill/state/WindmillValue.java  |  2 +-
 .../worker/windmill/state/WindmillWatermarkHold.java   |  3 ++-
 .../windmill/state/WindmillStateInternalsTest.java     | 18 +++++++++---------
 4 files changed, 13 insertions(+), 12 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillBag.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillBag.java
index b3719cc666d..702be70f411 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillBag.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillBag.java
@@ -193,7 +193,7 @@ public class WindmillBag<T> extends SimpleWindmillState 
implements BagState<T> {
       }
       // We now know the complete bag contents, and any read on it will yield a
       // cached value, so cache it for future reads.
-      cache.put(namespace, address, this, encodedSize);
+      cache.put(namespace, address, this, encodedSize + stateKey.size());
     }
 
     // Don't reuse the localAdditions object; we don't want future changes to 
it to
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillValue.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillValue.java
index 923d166c823..bc3e0906f99 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillValue.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillValue.java
@@ -124,7 +124,7 @@ public class WindmillValue<T> extends SimpleWindmillState 
implements ValueState<
         coder.encode(value, stream, Coder.Context.OUTER);
       }
       encoded = stream.toByteString();
-      cachedSize = encoded.size();
+      cachedSize = (long) encoded.size() + stateKey.size();
     }
 
     // Place in cache to avoid a future read.
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java
index 9d939c759d2..e8b2290c3c7 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java
@@ -175,13 +175,14 @@ public class WindmillWatermarkHold extends WindmillState 
implements WatermarkHol
       throw new IllegalStateException("Unreachable condition");
     }
 
+    final int estimatedByteSize = ENCODED_SIZE + stateKey.size();
     return Futures.lazyTransform(
         result,
         result1 -> {
           cleared = false;
           localAdditions = null;
           if (cachedValue != null) {
-            cache.put(namespace, address, WindmillWatermarkHold.this, 
ENCODED_SIZE);
+            cache.put(namespace, address, WindmillWatermarkHold.this, 
estimatedByteSize);
           }
           return result1;
         });
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
index d55a20e5517..d53b1d8c3e8 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
@@ -3043,7 +3043,7 @@ public class WindmillStateInternalsTest {
     value.write("Hi");
     underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
 
-    assertEquals(132, cache.getWeight());
+    assertEquals(141, cache.getWeight());
 
     resetUnderTest();
     value = underTest.state(NAMESPACE, addr);
@@ -3051,7 +3051,7 @@ public class WindmillStateInternalsTest {
     value.clear();
     underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
 
-    assertEquals(130, cache.getWeight());
+    assertEquals(139, cache.getWeight());
 
     resetUnderTest();
     value = underTest.state(NAMESPACE, addr);
@@ -3083,7 +3083,7 @@ public class WindmillStateInternalsTest {
 
     underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
 
-    assertEquals(140, cache.getWeight());
+    assertEquals(147, cache.getWeight());
 
     resetUnderTest();
     bag = underTest.state(NAMESPACE, addr);
@@ -3103,7 +3103,7 @@ public class WindmillStateInternalsTest {
 
     underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
 
-    assertEquals(133, cache.getWeight());
+    assertEquals(140, cache.getWeight());
 
     resetUnderTest();
     bag = underTest.state(NAMESPACE, addr);
@@ -3114,7 +3114,7 @@ public class WindmillStateInternalsTest {
 
     underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
 
-    assertEquals(134, cache.getWeight());
+    assertEquals(141, cache.getWeight());
 
     resetUnderTest();
     bag = underTest.state(NAMESPACE, addr);
@@ -3145,7 +3145,7 @@ public class WindmillStateInternalsTest {
 
     underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
 
-    assertEquals(138, cache.getWeight());
+    assertEquals(151, cache.getWeight());
 
     resetUnderTest();
     hold = underTest.state(NAMESPACE, addr);
@@ -3154,7 +3154,7 @@ public class WindmillStateInternalsTest {
 
     underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
 
-    assertEquals(138, cache.getWeight());
+    assertEquals(151, cache.getWeight());
 
     resetUnderTest();
     hold = underTest.state(NAMESPACE, addr);
@@ -3185,7 +3185,7 @@ public class WindmillStateInternalsTest {
 
     underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
 
-    assertEquals(131, cache.getWeight());
+    assertEquals(144, cache.getWeight());
 
     resetUnderTest();
     value = underTest.state(NAMESPACE, COMBINING_ADDR);
@@ -3196,7 +3196,7 @@ public class WindmillStateInternalsTest {
 
     underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
 
-    assertEquals(130, cache.getWeight());
+    assertEquals(143, cache.getWeight());
 
     resetUnderTest();
     value = underTest.state(NAMESPACE, COMBINING_ADDR);

Reply via email to