This is an automated email from the ASF dual-hosted git repository. yhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new c3b3fa62c3a Include byte size of stateKey in estimated weight of WindmillBag, WindmillValue, and WindmillWatermarkHold (#30654) c3b3fa62c3a is described below commit c3b3fa62c3a323e8da15a18aeba4a43b628efd24 Author: dmitryor <34167644+dmitr...@users.noreply.github.com> AuthorDate: Mon Mar 18 11:43:27 2024 -0700 Include byte size of stateKey in estimated weight of WindmillBag, WindmillValue, and WindmillWatermarkHold (#30654) * Update WindmillBag.java Include byte size of the stateKey on the BagState weight used to estimate and limit the total state cache size * Update WindmillValue.java Include stateKey size in the byte size of a WidnmillValue * Update WindmillWatermarkHold.java Include keyState size in the WatermarkHold estimated byte size * Fix formatting issue * Fix expected cache item weights in WindmillStateInternalsTest --- .../dataflow/worker/windmill/state/WindmillBag.java | 2 +- .../dataflow/worker/windmill/state/WindmillValue.java | 2 +- .../worker/windmill/state/WindmillWatermarkHold.java | 3 ++- .../windmill/state/WindmillStateInternalsTest.java | 18 +++++++++--------- 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillBag.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillBag.java index b3719cc666d..702be70f411 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillBag.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillBag.java @@ -193,7 +193,7 @@ public class WindmillBag<T> extends SimpleWindmillState implements BagState<T> { } // We now know the complete bag contents, and any read on it will yield a // cached value, so cache it for future reads. - cache.put(namespace, address, this, encodedSize); + cache.put(namespace, address, this, encodedSize + stateKey.size()); } // Don't reuse the localAdditions object; we don't want future changes to it to diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillValue.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillValue.java index 923d166c823..bc3e0906f99 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillValue.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillValue.java @@ -124,7 +124,7 @@ public class WindmillValue<T> extends SimpleWindmillState implements ValueState< coder.encode(value, stream, Coder.Context.OUTER); } encoded = stream.toByteString(); - cachedSize = encoded.size(); + cachedSize = (long) encoded.size() + stateKey.size(); } // Place in cache to avoid a future read. diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java index 9d939c759d2..e8b2290c3c7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java @@ -175,13 +175,14 @@ public class WindmillWatermarkHold extends WindmillState implements WatermarkHol throw new IllegalStateException("Unreachable condition"); } + final int estimatedByteSize = ENCODED_SIZE + stateKey.size(); return Futures.lazyTransform( result, result1 -> { cleared = false; localAdditions = null; if (cachedValue != null) { - cache.put(namespace, address, WindmillWatermarkHold.this, ENCODED_SIZE); + cache.put(namespace, address, WindmillWatermarkHold.this, estimatedByteSize); } return result1; }); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java index d55a20e5517..d53b1d8c3e8 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java @@ -3043,7 +3043,7 @@ public class WindmillStateInternalsTest { value.write("Hi"); underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); - assertEquals(132, cache.getWeight()); + assertEquals(141, cache.getWeight()); resetUnderTest(); value = underTest.state(NAMESPACE, addr); @@ -3051,7 +3051,7 @@ public class WindmillStateInternalsTest { value.clear(); underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); - assertEquals(130, cache.getWeight()); + assertEquals(139, cache.getWeight()); resetUnderTest(); value = underTest.state(NAMESPACE, addr); @@ -3083,7 +3083,7 @@ public class WindmillStateInternalsTest { underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); - assertEquals(140, cache.getWeight()); + assertEquals(147, cache.getWeight()); resetUnderTest(); bag = underTest.state(NAMESPACE, addr); @@ -3103,7 +3103,7 @@ public class WindmillStateInternalsTest { underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); - assertEquals(133, cache.getWeight()); + assertEquals(140, cache.getWeight()); resetUnderTest(); bag = underTest.state(NAMESPACE, addr); @@ -3114,7 +3114,7 @@ public class WindmillStateInternalsTest { underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); - assertEquals(134, cache.getWeight()); + assertEquals(141, cache.getWeight()); resetUnderTest(); bag = underTest.state(NAMESPACE, addr); @@ -3145,7 +3145,7 @@ public class WindmillStateInternalsTest { underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); - assertEquals(138, cache.getWeight()); + assertEquals(151, cache.getWeight()); resetUnderTest(); hold = underTest.state(NAMESPACE, addr); @@ -3154,7 +3154,7 @@ public class WindmillStateInternalsTest { underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); - assertEquals(138, cache.getWeight()); + assertEquals(151, cache.getWeight()); resetUnderTest(); hold = underTest.state(NAMESPACE, addr); @@ -3185,7 +3185,7 @@ public class WindmillStateInternalsTest { underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); - assertEquals(131, cache.getWeight()); + assertEquals(144, cache.getWeight()); resetUnderTest(); value = underTest.state(NAMESPACE, COMBINING_ADDR); @@ -3196,7 +3196,7 @@ public class WindmillStateInternalsTest { underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); - assertEquals(130, cache.getWeight()); + assertEquals(143, cache.getWeight()); resetUnderTest(); value = underTest.state(NAMESPACE, COMBINING_ADDR);