This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 62331b6f9e API: Remove overflow checks in DefaultCounter causing 
performance issues (#8297)
62331b6f9e is described below

commit 62331b6f9edea5bd42752882f980f6768bb69d16
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Mon Aug 14 11:37:44 2023 -0700

    API: Remove overflow checks in DefaultCounter causing performance issues 
(#8297)
---
 .../java/org/apache/iceberg/metrics/Counter.java   |  4 ++
 .../org/apache/iceberg/metrics/DefaultCounter.java |  8 ++-
 .../apache/iceberg/metrics/TestDefaultCounter.java | 10 ---
 .../iceberg/metrics/TestDefaultMetricsContext.java | 16 +----
 .../apache/iceberg/metrics/CountersBenchmark.java  | 82 ++++++++++++++++++++++
 5 files changed, 93 insertions(+), 27 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/metrics/Counter.java 
b/api/src/main/java/org/apache/iceberg/metrics/Counter.java
index e8d1a899fe..f984c608de 100644
--- a/api/src/main/java/org/apache/iceberg/metrics/Counter.java
+++ b/api/src/main/java/org/apache/iceberg/metrics/Counter.java
@@ -29,6 +29,8 @@ public interface Counter {
   /**
    * Increment the counter by the provided amount.
    *
+   * <p>Implementations may skip the overflow check for better write 
throughput.
+   *
    * @param amount to be incremented.
    */
   default void increment(int amount) {
@@ -38,6 +40,8 @@ public interface Counter {
   /**
    * Increment the counter by the provided amount.
    *
+   * <p>Implementations may skip the overflow check for better write 
throughput.
+   *
    * @param amount to be incremented.
    */
   void increment(long amount);
diff --git a/api/src/main/java/org/apache/iceberg/metrics/DefaultCounter.java 
b/api/src/main/java/org/apache/iceberg/metrics/DefaultCounter.java
index 9bb23092a8..fad9bf8c57 100644
--- a/api/src/main/java/org/apache/iceberg/metrics/DefaultCounter.java
+++ b/api/src/main/java/org/apache/iceberg/metrics/DefaultCounter.java
@@ -63,7 +63,6 @@ public class DefaultCounter implements Counter {
 
   @Override
   public void increment(long amount) {
-    Math.addExact(counter.longValue(), amount);
     counter.add(amount);
   }
 
@@ -107,7 +106,6 @@ public class DefaultCounter implements Counter {
 
     @Override
     public void increment(Integer amount) {
-      Math.addExact(counter.intValue(), amount);
       DefaultCounter.this.increment(amount);
     }
 
@@ -118,7 +116,11 @@ public class DefaultCounter implements Counter {
 
     @Override
     public Integer value() {
-      return counter.intValue();
+      long value = counter.longValue();
+      if (value > Integer.MAX_VALUE) {
+        throw new ArithmeticException("integer overflow");
+      }
+      return (int) value;
     }
 
     @Override
diff --git 
a/api/src/test/java/org/apache/iceberg/metrics/TestDefaultCounter.java 
b/api/src/test/java/org/apache/iceberg/metrics/TestDefaultCounter.java
index 5f62e20ab4..f77da34dca 100644
--- a/api/src/test/java/org/apache/iceberg/metrics/TestDefaultCounter.java
+++ b/api/src/test/java/org/apache/iceberg/metrics/TestDefaultCounter.java
@@ -49,14 +49,4 @@ public class TestDefaultCounter {
     Assertions.assertThat(counter.unit()).isEqualTo(MetricsContext.Unit.BYTES);
     Assertions.assertThat(counter.isNoop()).isFalse();
   }
-
-  @Test
-  public void counterOverflow() {
-    Counter counter = new DefaultCounter(MetricsContext.Unit.COUNT);
-    counter.increment(Long.MAX_VALUE);
-    Assertions.assertThatThrownBy(counter::increment)
-        .isInstanceOf(ArithmeticException.class)
-        .hasMessage("long overflow");
-    Assertions.assertThat(counter.value()).isEqualTo(Long.MAX_VALUE);
-  }
 }
diff --git 
a/api/src/test/java/org/apache/iceberg/metrics/TestDefaultMetricsContext.java 
b/api/src/test/java/org/apache/iceberg/metrics/TestDefaultMetricsContext.java
index 86c216119e..3819485040 100644
--- 
a/api/src/test/java/org/apache/iceberg/metrics/TestDefaultMetricsContext.java
+++ 
b/api/src/test/java/org/apache/iceberg/metrics/TestDefaultMetricsContext.java
@@ -60,10 +60,10 @@ public class TestDefaultMetricsContext {
     MetricsContext.Counter<Integer> counter =
         metricsContext.counter("test", Integer.class, 
MetricsContext.Unit.COUNT);
     counter.increment(Integer.MAX_VALUE);
-    Assertions.assertThatThrownBy(counter::increment)
+    counter.increment();
+    Assertions.assertThatThrownBy(counter::value)
         .isInstanceOf(ArithmeticException.class)
         .hasMessage("integer overflow");
-    Assertions.assertThat(counter.value()).isEqualTo(Integer.MAX_VALUE);
   }
 
   @Test
@@ -84,18 +84,6 @@ public class TestDefaultMetricsContext {
     Assertions.assertThat(counter.unit()).isEqualTo(MetricsContext.Unit.COUNT);
   }
 
-  @Test
-  public void longCounterOverflow() {
-    MetricsContext metricsContext = new DefaultMetricsContext();
-    MetricsContext.Counter<Long> counter =
-        metricsContext.counter("test", Long.class, MetricsContext.Unit.COUNT);
-    counter.increment(Long.MAX_VALUE);
-    Assertions.assertThatThrownBy(counter::increment)
-        .isInstanceOf(ArithmeticException.class)
-        .hasMessage("long overflow");
-    Assertions.assertThat(counter.value()).isEqualTo(Long.MAX_VALUE);
-  }
-
   @Test
   public void timer() {
     MetricsContext metricsContext = new DefaultMetricsContext();
diff --git 
a/core/src/jmh/java/org/apache/iceberg/metrics/CountersBenchmark.java 
b/core/src/jmh/java/org/apache/iceberg/metrics/CountersBenchmark.java
new file mode 100644
index 0000000000..f9a5a46cf3
--- /dev/null
+++ b/core/src/jmh/java/org/apache/iceberg/metrics/CountersBenchmark.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.metrics.MetricsContext.Unit;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Timeout;
+import org.openjdk.jmh.infra.Blackhole;
+
+@Fork(1)
+@State(Scope.Benchmark)
+@Measurement(iterations = 25)
+@BenchmarkMode(Mode.SingleShotTime)
+@Timeout(time = 10, timeUnit = TimeUnit.MINUTES)
+public class CountersBenchmark {
+
+  private static final int NUM_OPERATIONS = 10_000_000;
+  private static final int WORKER_POOL_SIZE = 16;
+  private static final int INCREMENT_AMOUNT = 10_000;
+
+  @Benchmark
+  @Threads(1)
+  public void defaultCounterMultipleThreads(Blackhole blackhole) {
+    Counter counter = new DefaultCounter(Unit.BYTES);
+
+    ExecutorService workerPool = ThreadPools.newWorkerPool("bench-pool", 
WORKER_POOL_SIZE);
+
+    try {
+      Tasks.range(WORKER_POOL_SIZE)
+          .executeWith(workerPool)
+          .run(
+              (id) -> {
+                for (int operation = 0; operation < NUM_OPERATIONS; 
operation++) {
+                  counter.increment(INCREMENT_AMOUNT);
+                }
+              });
+    } finally {
+      workerPool.shutdown();
+    }
+
+    blackhole.consume(counter);
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void defaultCounterSingleThread(Blackhole blackhole) {
+    Counter counter = new DefaultCounter(Unit.BYTES);
+
+    for (int operation = 0; operation < WORKER_POOL_SIZE * NUM_OPERATIONS; 
operation++) {
+      counter.increment(INCREMENT_AMOUNT);
+    }
+
+    blackhole.consume(counter);
+  }
+}

Reply via email to