This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 135007ec6b4 Update precombine bencmark to better represent varied workloads (#24343) 135007ec6b4 is described below commit 135007ec6b4281d2cbcb24f271bac8e442fc4316 Author: Luke Cwik <lc...@google.com> AuthorDate: Mon Nov 28 16:27:12 2022 -0800 Update precombine bencmark to better represent varied workloads (#24343) 1. Represent more data distributions (hot key, uniform, normal, unique) 2. Run longer allowing the JIT to function 3. Have a random ordering of data 4. Use a blackhole to prevent to the JIT from optimizing away the data --- .../fn/harness/jmh/CombinerTableBenchmark.java | 84 ---------------- .../jmh/PrecombineGroupingTableBenchmark.java | 112 +++++++++++++++++++++ 2 files changed, 112 insertions(+), 84 deletions(-) diff --git a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/CombinerTableBenchmark.java b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/CombinerTableBenchmark.java deleted file mode 100644 index 13d164c15b3..00000000000 --- a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/CombinerTableBenchmark.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.fn.harness.jmh; - -import static java.util.concurrent.TimeUnit.SECONDS; - -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.fn.harness.Caches; -import org.apache.beam.fn.harness.PrecombineGroupingTable; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.Fork; -import org.openjdk.jmh.annotations.Level; -import org.openjdk.jmh.annotations.Measurement; -import org.openjdk.jmh.annotations.Param; -import org.openjdk.jmh.annotations.Scope; -import org.openjdk.jmh.annotations.Setup; -import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.Warmup; - -public class CombinerTableBenchmark { - @State(Scope.Benchmark) - public static class CombinerTable { - final int numKeys = 1000; - final int numPerKey = 1000; - final Combine.BinaryCombineIntegerFn sumInts = Sum.ofIntegers(); - final PipelineOptions options = PipelineOptionsFactory.create(); - PrecombineGroupingTable<String, Integer, int[]> groupingTable; - List<WindowedValue<KV<String, Integer>>> elements; - - @Param({"true", "false"}) - public String globallyWindowed; - - @Setup(Level.Invocation) - public void setUp() { - groupingTable = - PrecombineGroupingTable.combiningAndSampling( - options, - Caches.eternal(), - sumInts, - StringUtf8Coder.of(), - .001, - Boolean.valueOf(globallyWindowed)); - elements = new ArrayList<>(); - for (int i = 0; i < numKeys; i++) { - elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(i), i))); - } - } - } - - @Benchmark - @Fork(value = 1) - @Warmup(time = 1, timeUnit = SECONDS) - @Measurement(time = 1, timeUnit = SECONDS) - public void uniformDistribution(CombinerTable table) throws Exception { - for (int i = 0; i < table.numPerKey; i++) { - for (WindowedValue<KV<String, Integer>> element : table.elements) { - table.groupingTable.put(element, null); - } - } - } -} diff --git a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/PrecombineGroupingTableBenchmark.java b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/PrecombineGroupingTableBenchmark.java new file mode 100644 index 00000000000..0deaf96f18f --- /dev/null +++ b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/PrecombineGroupingTableBenchmark.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.jmh; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import org.apache.beam.fn.harness.Caches; +import org.apache.beam.fn.harness.PrecombineGroupingTable; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; + +public class PrecombineGroupingTableBenchmark { + private static final int TOTAL_VALUES = 1_000_000; + + @State(Scope.Benchmark) + public static class SumIntegerBinaryCombine { + final Combine.BinaryCombineIntegerFn sumInts = Sum.ofIntegers(); + final PipelineOptions options = PipelineOptionsFactory.create(); + List<WindowedValue<KV<String, Integer>>> elements; + + @Param({"true", "false"}) + public String globallyWindowed; + + @Param({"uniform", "normal", "hotKey", "uniqueKeys"}) + public String distribution; + + @Setup(Level.Trial) + public void setUp() { + // Use a stable seed to ensure consistency across benchmark runs + Random random = new Random(-2134890234); + elements = new ArrayList<>(); + switch (distribution) { + case "uniform": + for (int i = 0; i < TOTAL_VALUES; ++i) { + int key = random.nextInt(1000); + elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(key), key))); + } + break; + case "normal": + for (int i = 0; i < TOTAL_VALUES; ++i) { + int key = (int) (random.nextGaussian() * 1000); + elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(key), key))); + } + break; + case "hotKey": + for (int i = 0; i < TOTAL_VALUES; ++i) { + int key; + if (random.nextBoolean()) { + key = 0; + } else { + key = random.nextInt(1000); + } + elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(key), key))); + } + break; + case "uniqueKeys": + for (int i = 0; i < TOTAL_VALUES; ++i) { + elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(i), i))); + } + Collections.shuffle(elements, random); + break; + default: + } + } + } + + @Benchmark + public void sumIntegerBinaryCombine(SumIntegerBinaryCombine table, Blackhole blackhole) + throws Exception { + PrecombineGroupingTable<String, Integer, int[]> groupingTable = + PrecombineGroupingTable.combiningAndSampling( + table.options, + Caches.eternal(), + table.sumInts, + StringUtf8Coder.of(), + .001, + Boolean.valueOf(table.globallyWindowed)); + for (int i = 0, size = table.elements.size(); i < size; ++i) { + groupingTable.put(table.elements.get(i), blackhole::consume); + } + groupingTable.flush(blackhole::consume); + } +}