This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 89aeb187f2c Correct per-entry HashMap overhead in WindmillStateCache 
(#30672)
89aeb187f2c is described below

commit 89aeb187f2c9350fa51fc2b2f690c93a57e523b9
Author: dmitryor <34167644+dmitr...@users.noreply.github.com>
AuthorDate: Wed Apr 10 01:28:23 2024 -0700

    Correct per-entry HashMap overhead in WindmillStateCache (#30672)
---
 .../worker/windmill/state/WindmillStateCache.java      |  3 ++-
 .../worker/windmill/state/WindmillStateCacheTest.java  | 12 ++++++------
 .../windmill/state/WindmillStateInternalsTest.java     | 18 +++++++++---------
 3 files changed, 17 insertions(+), 16 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
index 85c74fe8591..c6c49134bcb 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
@@ -64,7 +64,8 @@ public class WindmillStateCache implements StatusDataProvider 
{
   // Initial size of hash tables per entry.
   private static final int INITIAL_HASH_MAP_CAPACITY = 4;
   // Overhead of each hash map entry.
-  private static final int HASH_MAP_ENTRY_OVERHEAD = 16;
+  // https://appsintheopen.com/posts/52-the-memory-overhead-of-java-ojects
+  private static final int HASH_MAP_ENTRY_OVERHEAD = 32;
   // Overhead of each StateCacheEntry.  One long, plus a hash table.
   private static final int PER_CACHE_ENTRY_OVERHEAD =
       8 + HASH_MAP_ENTRY_OVERHEAD * INITIAL_HASH_MAP_CAPACITY;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java
index 1f4355b156b..446a34f73de 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java
@@ -168,15 +168,15 @@ public class WindmillStateCacheTest {
 
     assertEquals(0, cache.getWeight());
     keyCache.persist();
-    assertEquals(254, cache.getWeight());
+    assertEquals(414, cache.getWeight());
 
     keyCache.put(triggerNamespace(0, 0), new TestStateTag("tag3"), new 
TestState("t3"), 2);
     keyCache.put(triggerNamespace(0, 0), new TestStateTag("tag2"), new 
TestState("t2"), 2);
 
     // Observes updated weight in entries, though cache will not know about it.
-    assertEquals(290, cache.getWeight());
+    assertEquals(482, cache.getWeight());
     keyCache.persist();
-    assertEquals(290, cache.getWeight());
+    assertEquals(482, cache.getWeight());
 
     keyCache =
         cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L, 
2L).forFamily(STATE_FAMILY);
@@ -212,7 +212,7 @@ public class WindmillStateCacheTest {
 
     keyCache =
         cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L, 
2L).forFamily(STATE_FAMILY);
-    assertEquals(127, cache.getWeight());
+    assertEquals(207, cache.getWeight());
     assertEquals(
         Optional.of(new TestState("g1")),
         keyCache.get(StateNamespaces.global(), new TestStateTag("tag1")));
@@ -221,7 +221,7 @@ public class WindmillStateCacheTest {
         cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 1L, 
3L).forFamily(STATE_FAMILY);
     assertEquals(
         Optional.empty(), keyCache.get(StateNamespaces.global(), new 
TestStateTag("tag1")));
-    assertEquals(127, cache.getWeight());
+    assertEquals(207, cache.getWeight());
   }
 
   /** Verifies that the cache is invalidated when the cache token changes. */
@@ -254,7 +254,7 @@ public class WindmillStateCacheTest {
     assertEquals(Optional.of(new TestState("w2")), 
keyCache.get(windowNamespace(0), tag));
     assertEquals(0, cache.getWeight());
     keyCache.persist();
-    assertEquals(127, cache.getWeight());
+    assertEquals(207, cache.getWeight());
     assertEquals(Optional.of(new TestState("w2")), 
keyCache.get(windowNamespace(0), tag));
 
     // Previous work token.
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
index d53b1d8c3e8..a53240d6453 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
@@ -3043,7 +3043,7 @@ public class WindmillStateInternalsTest {
     value.write("Hi");
     underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
 
-    assertEquals(141, cache.getWeight());
+    assertEquals(221, cache.getWeight());
 
     resetUnderTest();
     value = underTest.state(NAMESPACE, addr);
@@ -3051,7 +3051,7 @@ public class WindmillStateInternalsTest {
     value.clear();
     underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
 
-    assertEquals(139, cache.getWeight());
+    assertEquals(219, cache.getWeight());
 
     resetUnderTest();
     value = underTest.state(NAMESPACE, addr);
@@ -3083,7 +3083,7 @@ public class WindmillStateInternalsTest {
 
     underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
 
-    assertEquals(147, cache.getWeight());
+    assertEquals(227, cache.getWeight());
 
     resetUnderTest();
     bag = underTest.state(NAMESPACE, addr);
@@ -3103,7 +3103,7 @@ public class WindmillStateInternalsTest {
 
     underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
 
-    assertEquals(140, cache.getWeight());
+    assertEquals(220, cache.getWeight());
 
     resetUnderTest();
     bag = underTest.state(NAMESPACE, addr);
@@ -3114,7 +3114,7 @@ public class WindmillStateInternalsTest {
 
     underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
 
-    assertEquals(141, cache.getWeight());
+    assertEquals(221, cache.getWeight());
 
     resetUnderTest();
     bag = underTest.state(NAMESPACE, addr);
@@ -3145,7 +3145,7 @@ public class WindmillStateInternalsTest {
 
     underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
 
-    assertEquals(151, cache.getWeight());
+    assertEquals(231, cache.getWeight());
 
     resetUnderTest();
     hold = underTest.state(NAMESPACE, addr);
@@ -3154,7 +3154,7 @@ public class WindmillStateInternalsTest {
 
     underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
 
-    assertEquals(151, cache.getWeight());
+    assertEquals(231, cache.getWeight());
 
     resetUnderTest();
     hold = underTest.state(NAMESPACE, addr);
@@ -3185,7 +3185,7 @@ public class WindmillStateInternalsTest {
 
     underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
 
-    assertEquals(144, cache.getWeight());
+    assertEquals(224, cache.getWeight());
 
     resetUnderTest();
     value = underTest.state(NAMESPACE, COMBINING_ADDR);
@@ -3196,7 +3196,7 @@ public class WindmillStateInternalsTest {
 
     underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());
 
-    assertEquals(143, cache.getWeight());
+    assertEquals(223, cache.getWeight());
 
     resetUnderTest();
     value = underTest.state(NAMESPACE, COMBINING_ADDR);

Reply via email to