This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 135007ec6b4 Update precombine bencmark to better represent varied 
workloads (#24343)
135007ec6b4 is described below

commit 135007ec6b4281d2cbcb24f271bac8e442fc4316
Author: Luke Cwik <lc...@google.com>
AuthorDate: Mon Nov 28 16:27:12 2022 -0800

    Update precombine bencmark to better represent varied workloads (#24343)
    
    1. Represent more data distributions (hot key, uniform, normal, unique)
    2. Run longer allowing the JIT to function
    3. Have a random ordering of data
    4. Use a blackhole to prevent to the JIT from optimizing away the data
---
 .../fn/harness/jmh/CombinerTableBenchmark.java     |  84 ----------------
 .../jmh/PrecombineGroupingTableBenchmark.java      | 112 +++++++++++++++++++++
 2 files changed, 112 insertions(+), 84 deletions(-)

diff --git 
a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/CombinerTableBenchmark.java
 
b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/CombinerTableBenchmark.java
deleted file mode 100644
index 13d164c15b3..00000000000
--- 
a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/CombinerTableBenchmark.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.fn.harness.jmh;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.fn.harness.Caches;
-import org.apache.beam.fn.harness.PrecombineGroupingTable;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.Fork;
-import org.openjdk.jmh.annotations.Level;
-import org.openjdk.jmh.annotations.Measurement;
-import org.openjdk.jmh.annotations.Param;
-import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.Warmup;
-
-public class CombinerTableBenchmark {
-  @State(Scope.Benchmark)
-  public static class CombinerTable {
-    final int numKeys = 1000;
-    final int numPerKey = 1000;
-    final Combine.BinaryCombineIntegerFn sumInts = Sum.ofIntegers();
-    final PipelineOptions options = PipelineOptionsFactory.create();
-    PrecombineGroupingTable<String, Integer, int[]> groupingTable;
-    List<WindowedValue<KV<String, Integer>>> elements;
-
-    @Param({"true", "false"})
-    public String globallyWindowed;
-
-    @Setup(Level.Invocation)
-    public void setUp() {
-      groupingTable =
-          PrecombineGroupingTable.combiningAndSampling(
-              options,
-              Caches.eternal(),
-              sumInts,
-              StringUtf8Coder.of(),
-              .001,
-              Boolean.valueOf(globallyWindowed));
-      elements = new ArrayList<>();
-      for (int i = 0; i < numKeys; i++) {
-        
elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(i), i)));
-      }
-    }
-  }
-
-  @Benchmark
-  @Fork(value = 1)
-  @Warmup(time = 1, timeUnit = SECONDS)
-  @Measurement(time = 1, timeUnit = SECONDS)
-  public void uniformDistribution(CombinerTable table) throws Exception {
-    for (int i = 0; i < table.numPerKey; i++) {
-      for (WindowedValue<KV<String, Integer>> element : table.elements) {
-        table.groupingTable.put(element, null);
-      }
-    }
-  }
-}
diff --git 
a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/PrecombineGroupingTableBenchmark.java
 
b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/PrecombineGroupingTableBenchmark.java
new file mode 100644
index 00000000000..0deaf96f18f
--- /dev/null
+++ 
b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/PrecombineGroupingTableBenchmark.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.jmh;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import org.apache.beam.fn.harness.Caches;
+import org.apache.beam.fn.harness.PrecombineGroupingTable;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.infra.Blackhole;
+
+public class PrecombineGroupingTableBenchmark {
+  private static final int TOTAL_VALUES = 1_000_000;
+
+  @State(Scope.Benchmark)
+  public static class SumIntegerBinaryCombine {
+    final Combine.BinaryCombineIntegerFn sumInts = Sum.ofIntegers();
+    final PipelineOptions options = PipelineOptionsFactory.create();
+    List<WindowedValue<KV<String, Integer>>> elements;
+
+    @Param({"true", "false"})
+    public String globallyWindowed;
+
+    @Param({"uniform", "normal", "hotKey", "uniqueKeys"})
+    public String distribution;
+
+    @Setup(Level.Trial)
+    public void setUp() {
+      // Use a stable seed to ensure consistency across benchmark runs
+      Random random = new Random(-2134890234);
+      elements = new ArrayList<>();
+      switch (distribution) {
+        case "uniform":
+          for (int i = 0; i < TOTAL_VALUES; ++i) {
+            int key = random.nextInt(1000);
+            
elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(key), 
key)));
+          }
+          break;
+        case "normal":
+          for (int i = 0; i < TOTAL_VALUES; ++i) {
+            int key = (int) (random.nextGaussian() * 1000);
+            
elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(key), 
key)));
+          }
+          break;
+        case "hotKey":
+          for (int i = 0; i < TOTAL_VALUES; ++i) {
+            int key;
+            if (random.nextBoolean()) {
+              key = 0;
+            } else {
+              key = random.nextInt(1000);
+            }
+            
elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(key), 
key)));
+          }
+          break;
+        case "uniqueKeys":
+          for (int i = 0; i < TOTAL_VALUES; ++i) {
+            
elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(i), i)));
+          }
+          Collections.shuffle(elements, random);
+          break;
+        default:
+      }
+    }
+  }
+
+  @Benchmark
+  public void sumIntegerBinaryCombine(SumIntegerBinaryCombine table, Blackhole 
blackhole)
+      throws Exception {
+    PrecombineGroupingTable<String, Integer, int[]> groupingTable =
+        PrecombineGroupingTable.combiningAndSampling(
+            table.options,
+            Caches.eternal(),
+            table.sumInts,
+            StringUtf8Coder.of(),
+            .001,
+            Boolean.valueOf(table.globallyWindowed));
+    for (int i = 0, size = table.elements.size(); i < size; ++i) {
+      groupingTable.put(table.elements.get(i), blackhole::consume);
+    }
+    groupingTable.flush(blackhole::consume);
+  }
+}

Reply via email to