This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new c65e777d6c6 Optimize PGBK table to only update cache when there is a large enough size change. #21250 (#25219) c65e777d6c6 is described below commit c65e777d6c62d25184b91fa50d7281411c22f2ec Author: Luke Cwik <lc...@google.com> AuthorDate: Fri Feb 3 14:45:15 2023 -0800 Optimize PGBK table to only update cache when there is a large enough size change. #21250 (#25219) * Optimize PGBK table to only update cache when there is a large enough size change. #21250 This prevents an expensive scenario where a user is outputting lots of small values (e.g. ints) to be precombined and hence takes little to no space to store so updating the cache provides little value. Note the 5-10x change for all types except for unique keys. Some early profiles show that there is an issue with the G1 garbage collector when storing so many small values that the GC management overhead dominates 75% of the execution which requires further investigation. Before: ``` Benchmark (distribution) (globallyWindowed) Mode Cnt Score Error Units PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine uniform true thrpt 5 8.306 ± 1.255 ops/s PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine uniform false thrpt 5 7.849 ± 0.476 ops/s PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine normal true thrpt 5 10.575 ± 1.295 ops/s PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine normal false thrpt 5 10.772 ± 0.141 ops/s PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine hotKey true thrpt 5 9.131 ± 2.761 ops/s PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine hotKey false thrpt 5 8.302 ± 1.078 ops/s PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine uniqueKeys true thrpt 5 3.899 ± 1.737 ops/s PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine uniqueKeys false thrpt 5 4.203 ± 2.170 ops/s ``` After: ``` Benchmark (distribution) (globallyWindowed) Mode Cnt Score Error Units PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine uniform true thrpt 5 88.740 ± 8.925 ops/s PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine uniform false thrpt 5 76.005 ± 5.150 ops/s PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine normal true thrpt 5 43.388 ± 1.966 ops/s PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine normal false thrpt 5 37.804 ± 7.177 ops/s PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine hotKey true thrpt 5 84.881 ± 5.040 ops/s PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine hotKey false thrpt 5 74.183 ± 2.063 ops/s PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine uniqueKeys true thrpt 5 5.567 ± 4.068 ops/s PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine uniqueKeys false thrpt 5 6.957 ± 1.508 ops/s ``` --- .../org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +- .../jmh/PrecombineGroupingTableBenchmark.java | 87 +++++++++++++--------- .../java/org/apache/beam/fn/harness/Caches.java | 43 ++++++++++- .../beam/fn/harness/PrecombineGroupingTable.java | 10 +-- .../fn/harness/PrecombineGroupingTableTest.java | 33 ++++---- 5 files changed, 117 insertions(+), 58 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 4d9c2109b75..414b3ccdc55 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -1553,7 +1553,7 @@ class BeamModulePlugin implements Plugin<Project> { if (project.file("/opt/cprof/profiler_java_agent.so").exists()) { def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing' def userName = System.getProperty("user.name").toLowerCase().replaceAll(" ", "_") - jvmArgs '-agentpath:/opt/cprof/profiler_java_agent.so=-cprof_service=' + userName + "_" + project.getProperty("benchmark").toLowerCase() + '_' + System.currentTimeMillis() + ',-cprof_project_id=' + gcpProject + ',-cprof_zone_name=us-central1-a' + jvmArgs '-agentpath:/opt/cprof/profiler_java_agent.so=-cprof_service=' + userName + "_" + project.getProperty("benchmark").toLowerCase() + '_' + String.format('%1$tY%1$tm%1$td_%1$tH%1$tM%1$tS_%1$tL', System.currentTimeMillis()) + ',-cprof_project_id=' + gcpProject + ',-cprof_zone_name=us-central1-a' } } else { // We filter for only Apache Beam benchmarks to ensure that we aren't diff --git a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/PrecombineGroupingTableBenchmark.java b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/PrecombineGroupingTableBenchmark.java index 0deaf96f18f..ded8358a10d 100644 --- a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/PrecombineGroupingTableBenchmark.java +++ b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/PrecombineGroupingTableBenchmark.java @@ -21,7 +21,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Random; +import org.apache.beam.fn.harness.Cache; import org.apache.beam.fn.harness.Caches; +import org.apache.beam.fn.harness.Caches.ClearableCache; import org.apache.beam.fn.harness.PrecombineGroupingTable; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptions; @@ -36,15 +38,20 @@ import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.infra.Blackhole; public class PrecombineGroupingTableBenchmark { private static final int TOTAL_VALUES = 1_000_000; + private static final int KEY_SPACE = 1_000; @State(Scope.Benchmark) public static class SumIntegerBinaryCombine { final Combine.BinaryCombineIntegerFn sumInts = Sum.ofIntegers(); final PipelineOptions options = PipelineOptionsFactory.create(); + + final Cache<Object, Object> cache = Caches.fromOptions(options); + List<WindowedValue<KV<String, Integer>>> elements; @Param({"true", "false"}) @@ -55,51 +62,60 @@ public class PrecombineGroupingTableBenchmark { @Setup(Level.Trial) public void setUp() { - // Use a stable seed to ensure consistency across benchmark runs - Random random = new Random(-2134890234); - elements = new ArrayList<>(); - switch (distribution) { - case "uniform": - for (int i = 0; i < TOTAL_VALUES; ++i) { - int key = random.nextInt(1000); - elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(key), key))); - } - break; - case "normal": - for (int i = 0; i < TOTAL_VALUES; ++i) { - int key = (int) (random.nextGaussian() * 1000); - elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(key), key))); - } - break; - case "hotKey": - for (int i = 0; i < TOTAL_VALUES; ++i) { - int key; - if (random.nextBoolean()) { - key = 0; - } else { - key = random.nextInt(1000); - } - elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(key), key))); - } - break; - case "uniqueKeys": - for (int i = 0; i < TOTAL_VALUES; ++i) { - elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(i), i))); + this.elements = generateTestData(distribution); + } + } + + private static List<WindowedValue<KV<String, Integer>>> generateTestData(String distribution) { + // Use a stable seed to ensure consistency across benchmark runs + Random random = new Random(-2134890234); + List<WindowedValue<KV<String, Integer>>> elements = new ArrayList<>(); + switch (distribution) { + case "uniform": + for (int i = 0; i < TOTAL_VALUES; ++i) { + int key = random.nextInt(KEY_SPACE); + elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(key), key))); + } + break; + case "normal": + for (int i = 0; i < TOTAL_VALUES; ++i) { + int key = (int) (random.nextGaussian() * KEY_SPACE); + elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(key), key))); + } + break; + case "hotKey": + for (int i = 0; i < TOTAL_VALUES; ++i) { + int key; + if (random.nextBoolean()) { + key = -123814201; + } else { + key = random.nextInt(KEY_SPACE); } - Collections.shuffle(elements, random); - break; - default: - } + elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(key), key))); + } + break; + case "uniqueKeys": + for (int i = 0; i < TOTAL_VALUES; ++i) { + elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(i), i))); + } + Collections.shuffle(elements, random); + break; + default: + throw new IllegalArgumentException("Unknown distribution: " + distribution); } + return elements; } @Benchmark + @Threads(16) public void sumIntegerBinaryCombine(SumIntegerBinaryCombine table, Blackhole blackhole) throws Exception { + ClearableCache<Object, Object> cache = + new ClearableCache<>(Caches.subCache(table.cache, Thread.currentThread().getName())); PrecombineGroupingTable<String, Integer, int[]> groupingTable = PrecombineGroupingTable.combiningAndSampling( table.options, - Caches.eternal(), + cache, table.sumInts, StringUtf8Coder.of(), .001, @@ -108,5 +124,6 @@ public class PrecombineGroupingTableBenchmark { groupingTable.put(table.elements.get(i), blackhole::consume); } groupingTable.flush(blackhole::consume); + cache.clear(); } } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java index 514b21575b2..5b2330b72f6 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java @@ -53,12 +53,25 @@ public final class Caches { */ @VisibleForTesting static final int WEIGHT_RATIO = 6; + /** All objects less than or equal to this size will account for 1. */ + private static final long MIN_OBJECT_SIZE = 1 << WEIGHT_RATIO; + + /** + * Objects which change in this amount should always update the cache. + * + * <p>The limit of 2^16 is chosen to be small enough such that objects will be close enough if + * they change frequently. Future work could scale these ratios based upon the configured cache + * size. + */ + private static final long CACHE_SIZE_CHANGE_LIMIT_BYTES = 1 << 16; + private static final MemoryMeter MEMORY_METER = MemoryMeter.builder().withGuessing(Guess.BEST).build(); /** The size of a reference. */ public static final long REFERENCE_SIZE = 8; + /** Returns the amount of memory in bytes the provided object consumes. */ public static long weigh(Object o) { if (o == null) { return REFERENCE_SIZE; @@ -73,6 +86,25 @@ public final class Caches { } } + /** + * Returns whether the cache should be updated in the case where the objects size has changed. + * + * <p>Note that this should only be used in the case where the cache is being updated very often + * in a tight loop and is not a good fit for cases where the object being cached is the result of + * an expensive operation like a disk read or remote service call. + */ + public static boolean shouldUpdateOnSizeChange(long oldSize, long newSize) { + /* + Our strategy is three fold: + - tiny objects don't impact the cache accounting and count as a size of `1` in the cache. + - large changes (>= CACHE_SIZE_CHANGE_LIMIT_BYTES) should always update the size + - all others if the size changed by a factor of 2 + */ + return (oldSize > MIN_OBJECT_SIZE || newSize > MIN_OBJECT_SIZE) + && ((newSize - oldSize >= CACHE_SIZE_CHANGE_LIMIT_BYTES) + || Long.highestOneBit(oldSize) != Long.highestOneBit(newSize)); + } + /** An eviction listener that reduces the size of entries that are {@link Shrinkable}. */ @VisibleForTesting static class ShrinkOnEviction implements RemovalListener<CompositeKey, WeightedValue<Object>> { @@ -184,8 +216,15 @@ public final class Caches { // which is why we set the concurrency level to 1. See // https://github.com/google/guava/issues/3462 for further details. // - // The ProcessBundleBenchmark#testStateWithCaching shows no noticeable change - // when this parameter is left at the default. + // The PrecombineGroupingTable showed contention here since it was working in + // a tight loop. We were able to resolve the contention by reducing the + // frequency of updates. Reconsider this value if we could solve the maximum + // entry size issue. Note that using Runtime.getRuntime().availableProcessors() + // is subject to docker CPU shares issues + // (https://bugs.openjdk.org/browse/JDK-8281181). + // + // We could revisit the caffeine cache library based upon reinvestigating + // recursive computeIfAbsent calls since it doesn't have this limit. .concurrencyLevel(1) .recordStats(), weightInBytes) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java index e311322880c..438f6b9d668 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java @@ -128,8 +128,7 @@ public class PrecombineGroupingTable<K, InputT, AccumT> private final AtomicLong maxWeight; private long weight; private final boolean isGloballyWindowed; - private long checkFlushCounter; - private long checkFlushLimit = -5; + private long lastWeightForFlush; private static final class Key implements Weighted { private static final Key INSTANCE = new Key(); @@ -407,12 +406,9 @@ public class PrecombineGroupingTable<K, InputT, AccumT> return tableEntry; }); - if (checkFlushCounter++ < checkFlushLimit) { - return; - } else { - checkFlushLimit = Math.min(checkFlushLimit + 1, 25); - checkFlushCounter = 0; + if (Caches.shouldUpdateOnSizeChange(lastWeightForFlush, weight)) { flushIfNeeded(receiver); + lastWeightForFlush = weight; } } diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/PrecombineGroupingTableTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/PrecombineGroupingTableTest.java index 28def2ef6f5..d9fd19c651a 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/PrecombineGroupingTableTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/PrecombineGroupingTableTest.java @@ -25,6 +25,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.in; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; @@ -156,19 +157,19 @@ public class PrecombineGroupingTableTest { assertThat(receiver.outputElems, empty()); // Putting in other large keys should cause eviction. - table.put(valueInGlobalWindow(KV.of("BBB", 9)), receiver); + table.put(valueInGlobalWindow(KV.of("BB", 509)), receiver); table.put(valueInGlobalWindow(KV.of("CCC", 11)), receiver); assertThat( receiver.outputElems, containsInAnyOrder( - valueInGlobalWindow(KV.of("AAA", 1L + 2 + 4)), valueInGlobalWindow(KV.of("BBB", 9L)))); + valueInGlobalWindow(KV.of("AAA", 1L + 2 + 4)), valueInGlobalWindow(KV.of("BB", 509L)))); table.flush(receiver); assertThat( receiver.outputElems, containsInAnyOrder( valueInGlobalWindow(KV.of("AAA", 1L + 2 + 4)), - valueInGlobalWindow(KV.of("BBB", 9L)), + valueInGlobalWindow(KV.of("BB", 509L)), valueInGlobalWindow(KV.of("CCC", 11L)))); } @@ -225,17 +226,20 @@ public class PrecombineGroupingTableTest { // Insert three compactable values which shouldn't lead to eviction even though we are over // the maximum size. - table.put(valueInGlobalWindow(KV.of("A", 1004)), receiver); - table.put(valueInGlobalWindow(KV.of("B", 1004)), receiver); + table.put(valueInGlobalWindow(KV.of("A", 804)), receiver); + table.put(valueInGlobalWindow(KV.of("B", 904)), receiver); table.put(valueInGlobalWindow(KV.of("C", 1004)), receiver); assertThat(receiver.outputElems, empty()); + // Ensure that compaction occurred during the insertion of the above elements before flushing. + assertThat(table.getWeight(), lessThan(804L + 904L + 1004L)); + table.flush(receiver); assertThat( receiver.outputElems, containsInAnyOrder( - valueInGlobalWindow(KV.of("A", 1004L / 4)), - valueInGlobalWindow(KV.of("B", 1004L / 4)), + valueInGlobalWindow(KV.of("A", 804L / 4)), + valueInGlobalWindow(KV.of("B", 904L / 4)), valueInGlobalWindow(KV.of("C", 1004L / 4)))); } @@ -254,20 +258,20 @@ public class PrecombineGroupingTableTest { // Insert three values which even with compaction isn't enough so we evict A & B to get // under the max weight. - table.put(valueInGlobalWindow(KV.of("A", 1001)), receiver); - table.put(valueInGlobalWindow(KV.of("B", 1001)), receiver); + table.put(valueInGlobalWindow(KV.of("A", 801)), receiver); + table.put(valueInGlobalWindow(KV.of("B", 901)), receiver); table.put(valueInGlobalWindow(KV.of("C", 1001)), receiver); assertThat( receiver.outputElems, containsInAnyOrder( - valueInGlobalWindow(KV.of("A", 1001L)), valueInGlobalWindow(KV.of("B", 1001L)))); + valueInGlobalWindow(KV.of("A", 801L)), valueInGlobalWindow(KV.of("B", 901L)))); table.flush(receiver); assertThat( receiver.outputElems, containsInAnyOrder( - valueInGlobalWindow(KV.of("A", 1001L)), - valueInGlobalWindow(KV.of("B", 1001L)), + valueInGlobalWindow(KV.of("A", 801L)), + valueInGlobalWindow(KV.of("B", 901L)), valueInGlobalWindow(KV.of("C", 1001L)))); } @@ -460,7 +464,10 @@ public class PrecombineGroupingTableTest { }; } - /** "Estimate" the size of strings by taking the tenth power of their length. */ + /** + * Used to simulate very specific compaction/eviction tests under certain scenarios instead of + * relying on JAMM for size estimation. Strings are 10^length and longs are their value. + */ private static class TestSizeEstimator implements SizeEstimator { int calls = 0;