szetszwo commented on code in PR #10438: URL: https://github.com/apache/ozone/pull/10438#discussion_r3383994448
########## hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/ChunkBufferPutBenchmark.java: ########## @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.common; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.text.DecimalFormat; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.ozone.common.JfrByteBufferAllocations.AllocationStats; + +/** + * Microbenchmark for ChunkBuffer.put(byte[]) direct copy vs ByteBuffer.wrap path. + * + * <p>Focused on the scenarios where HDDS-15485 shows the clearest benefit: + * <ul> + * <li>Throughput: 4KB stream fill with incremental buffer (64KB increment)</li> + * <li>Allocations: same 4KB / 64KB-increment incremental buffer path (JFR + wrap calls)</li> + * </ul> + * + * <p>Run from the repo root: + * <pre> + * mvn -pl hadoop-hdds/common -q test-compile exec:java \ + * -Dexec.mainClass=org.apache.hadoop.ozone.common.ChunkBufferPutBenchmark \ + * -Dexec.classpathScope=test \ + * -Dexec.args="--add-opens jdk.jfr/jdk.jfr=ALL-UNNAMED --add-opens jdk.jfr/jdk.jfr.consumer=ALL-UNNAMED" + * </pre> + * JFR ByteBuffer counts are sampled; put-op count reports exact wrap calls. + */ +public final class ChunkBufferPutBenchmark { + + private static final int WARMUP_SECONDS = 10; + private static final int BENCHMARK_SECONDS = 20; + private static final int ALLOCATION_BENCHMARK_SECONDS = 5; + private static final int THROUGHPUT_ROUNDS = 3; + private static final DecimalFormat MBPS = new DecimalFormat("#,##0.0"); + private static final DecimalFormat NS = new DecimalFormat("#,##0"); + private static final DecimalFormat PCT = new DecimalFormat("#,##0.0"); + private static final DecimalFormat RATIO = new DecimalFormat("0.00"); + private static final DecimalFormat COUNT = new DecimalFormat("#,##0"); + + /** ozone.client.stream.buffer.size default. */ + private static final int DEFAULT_CHUNK_SIZE = 4 * 1024 * 1024; + + /** Showcase: ozone.client.stream.buffer.increment for incremental buffer. */ + private static final int INCREMENTAL_BUFFER_INCREMENT = 64 * 1024; + + /** Hadoop io.file.buffer.size / FSDataOutputStream default. */ + private static final int HADOOP_FS_BUFFER_SIZE = 4 * 1024; + + private static final ThreadLocal<byte[]> SOURCE = ThreadLocal.withInitial(() -> { + byte[] bytes = new byte[DEFAULT_CHUNK_SIZE]; + ThreadLocalRandom.current().nextBytes(bytes); + return bytes; + }); + + private ChunkBufferPutBenchmark() { + } + + public static void main(String[] args) throws IOException { + System.out.println("ChunkBuffer.put(byte[]) microbenchmark (pre-allocated buffer, put-only)"); + System.out.println("JVM: " + System.getProperty("java.version") + + " on " + System.getProperty("os.arch")); + System.out.println(); + + final Scenario showcase = new Scenario( + "Incremental buffer showcase", + "ozone.client.stream.buffer.size=4MB, " + + "ozone.client.stream.buffer.increment=64KB, io.file.buffer.size=4KB", + "4KB stream fill into IncrementalChunkBuffer (64KB steps)", Review Comment: Do not hard code config and pattern. It should be built using other parmeters. ########## hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java: ########## @@ -157,6 +157,39 @@ public ChunkBuffer put(ByteBuffer that) { return this; } + @Override + public ChunkBuffer put(byte[] b, int offset, int length) { + Objects.requireNonNull(b, "b == null"); + Preconditions.checkArgument(offset >= 0 && length >= 0, + "offset = %s, length = %s", offset, length); + Preconditions.checkArgument(length <= b.length - offset, + "length = %s out of range for array.length = %s, offset = %s", + length, b.length, offset); + if (length > remaining()) { + final BufferOverflowException boe = new BufferOverflowException(); + boe.initCause(new IllegalArgumentException( + "Failed to put since length = " + length + + " > this.remaining() = " + remaining())); + throw boe; + } Review Comment: Add a checkArgument(..) method: ```diff diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java index 600a7b5679..ab36cd636d 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java @@ -18,10 +18,13 @@ package org.apache.hadoop.ozone.common; import java.io.IOException; +import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.nio.channels.GatheringByteChannel; import java.util.List; import java.util.Objects; + +import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.utils.db.CodecBuffer; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.UncheckedAutoCloseable; @@ -64,6 +67,22 @@ static ChunkBuffer wrap(List<ByteBuffer> buffers) { return new ChunkBufferImplWithByteBufferList(buffers); } + default void checkArgument(byte[] b, int offset, int length) { + Objects.requireNonNull(b, "b == null"); + Preconditions.checkArgument(offset >= 0 && length >= 0, + "offset = %s, length = %s", offset, length); + Preconditions.checkArgument(length <= b.length - offset, + "length = %s out of range for array.length = %s, offset = %s", + length, b.length, offset); + if (length > remaining()) { + final BufferOverflowException boe = new BufferOverflowException(); + boe.initCause(new IllegalArgumentException( + "Failed to put since length = " + length + + " > this.remaining() = " + remaining())); + throw boe; + } + } + /** Similar to {@link ByteBuffer#position()}. */ int position(); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java index 5fe6cd7b1a..5bc9300e90 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java @@ -120,6 +120,7 @@ public ChunkBuffer put(ByteBuffer b) { @Override public ChunkBuffer put(byte[] b, int offset, int length) { + checkArgument(b, offset, length); buffer.put(b, offset, length); return this; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java index d9f8e41a51..c4c8c23cf3 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java @@ -159,19 +159,7 @@ public ChunkBuffer put(ByteBuffer that) { @Override public ChunkBuffer put(byte[] b, int offset, int length) { - Objects.requireNonNull(b, "b == null"); - Preconditions.checkArgument(offset >= 0 && length >= 0, - "offset = %s, length = %s", offset, length); - Preconditions.checkArgument(length <= b.length - offset, - "length = %s out of range for array.length = %s, offset = %s", - length, b.length, offset); - if (length > remaining()) { - final BufferOverflowException boe = new BufferOverflowException(); - boe.initCause(new IllegalArgumentException( - "Failed to put since length = " + length - + " > this.remaining() = " + remaining())); - throw boe; - } + checkArgument(b, offset, length); final int end = offset + length; int off = offset; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java index 39d6281373..8e9b982aff 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java @@ -233,19 +233,7 @@ public ChunkBuffer put(ByteBuffer that) { @Override public ChunkBuffer put(byte[] b, int offset, int length) { - Objects.requireNonNull(b, "b == null"); - Preconditions.checkArgument(offset >= 0 && length >= 0, - "offset = %s, length = %s", offset, length); - Preconditions.checkArgument(length <= b.length - offset, - "length = %s out of range for array.length = %s, offset = %s", - length, b.length, offset); - if (length > remaining()) { - final BufferOverflowException boe = new BufferOverflowException(); - boe.initCause(new IllegalArgumentException( - "Failed to put since length = " + length - + " > this.remaining() = " + remaining())); - throw boe; - } + checkArgument(b, offset, length); final int end = offset + length; for (int p = position(); offset < end; ) { ``` ########## hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/ChunkBufferPutBenchmark.java: ########## @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.common; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.text.DecimalFormat; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.ozone.common.JfrByteBufferAllocations.AllocationStats; + +/** + * Microbenchmark for ChunkBuffer.put(byte[]) direct copy vs ByteBuffer.wrap path. + * + * <p>Focused on the scenarios where HDDS-15485 shows the clearest benefit: + * <ul> + * <li>Throughput: 4KB stream fill with incremental buffer (64KB increment)</li> + * <li>Allocations: same 4KB / 64KB-increment incremental buffer path (JFR + wrap calls)</li> + * </ul> + * + * <p>Run from the repo root: + * <pre> + * mvn -pl hadoop-hdds/common -q test-compile exec:java \ + * -Dexec.mainClass=org.apache.hadoop.ozone.common.ChunkBufferPutBenchmark \ + * -Dexec.classpathScope=test \ + * -Dexec.args="--add-opens jdk.jfr/jdk.jfr=ALL-UNNAMED --add-opens jdk.jfr/jdk.jfr.consumer=ALL-UNNAMED" + * </pre> + * JFR ByteBuffer counts are sampled; put-op count reports exact wrap calls. + */ +public final class ChunkBufferPutBenchmark { + + private static final int WARMUP_SECONDS = 10; + private static final int BENCHMARK_SECONDS = 20; + private static final int ALLOCATION_BENCHMARK_SECONDS = 5; + private static final int THROUGHPUT_ROUNDS = 3; + private static final DecimalFormat MBPS = new DecimalFormat("#,##0.0"); + private static final DecimalFormat NS = new DecimalFormat("#,##0"); + private static final DecimalFormat PCT = new DecimalFormat("#,##0.0"); + private static final DecimalFormat RATIO = new DecimalFormat("0.00"); + private static final DecimalFormat COUNT = new DecimalFormat("#,##0"); + + /** ozone.client.stream.buffer.size default. */ + private static final int DEFAULT_CHUNK_SIZE = 4 * 1024 * 1024; + + /** Showcase: ozone.client.stream.buffer.increment for incremental buffer. */ + private static final int INCREMENTAL_BUFFER_INCREMENT = 64 * 1024; + + /** Hadoop io.file.buffer.size / FSDataOutputStream default. */ + private static final int HADOOP_FS_BUFFER_SIZE = 4 * 1024; + + private static final ThreadLocal<byte[]> SOURCE = ThreadLocal.withInitial(() -> { + byte[] bytes = new byte[DEFAULT_CHUNK_SIZE]; + ThreadLocalRandom.current().nextBytes(bytes); + return bytes; + }); + + private ChunkBufferPutBenchmark() { + } + + public static void main(String[] args) throws IOException { + System.out.println("ChunkBuffer.put(byte[]) microbenchmark (pre-allocated buffer, put-only)"); + System.out.println("JVM: " + System.getProperty("java.version") + + " on " + System.getProperty("os.arch")); + System.out.println(); + + final Scenario showcase = new Scenario( + "Incremental buffer showcase", + "ozone.client.stream.buffer.size=4MB, " + + "ozone.client.stream.buffer.increment=64KB, io.file.buffer.size=4KB", + "4KB stream fill into IncrementalChunkBuffer (64KB steps)", + DEFAULT_CHUNK_SIZE, + INCREMENTAL_BUFFER_INCREMENT, + HADOOP_FS_BUFFER_SIZE); + + System.out.println("=== Throughput showcase ==="); + runThroughputComparison(showcase); + System.out.println(); + + System.out.println("=== Allocation showcase ==="); + runAllocationComparison(showcase); + } + + private static void runThroughputComparison(Scenario scenario) { + printScenarioHeader(scenario); + warmupBothPaths(scenario); + final double[] improvements = new double[THROUGHPUT_ROUNDS]; + for (int round = 0; round < THROUGHPUT_ROUNDS; round++) { + final boolean directFirst = round % 2 == 0; + final Result direct; + final Result wrap; + if (directFirst) { + direct = benchmarkThroughput(scenario, ChunkBufferPutBenchmark::loopStreamFillDirect); + wrap = benchmarkThroughput(scenario, ChunkBufferPutBenchmark::loopStreamFillWrap); + } else { + wrap = benchmarkThroughput(scenario, ChunkBufferPutBenchmark::loopStreamFillWrap); + direct = benchmarkThroughput(scenario, ChunkBufferPutBenchmark::loopStreamFillDirect); + } + improvements[round] = wrap.nsPerOp / direct.nsPerOp - 1.0; + System.out.printf(" round %d:%n", round + 1); + printThroughputComparison(scenario.writeSize, direct, wrap, " "); + } + final double medianImprovement = median(improvements) * 100.0; + System.out.printf(" median improvement over %d rounds: %s%%%n", + THROUGHPUT_ROUNDS, PCT.format(medianImprovement)); + } + + private static void warmupBothPaths(Scenario scenario) { + final byte[] source = SOURCE.get(); + final int writesPerChunk = scenario.chunkSize / scenario.writeSize; + try (ChunkBuffer buffer = ChunkBuffer.allocate(scenario.chunkSize, + scenario.bufferIncrement)) { + for (int i = 0; i < 2; i++) { + loopStreamFillDirect(buffer, source, scenario.writeSize, writesPerChunk, + WARMUP_SECONDS); + loopStreamFillWrap(buffer, source, scenario.writeSize, writesPerChunk, + WARMUP_SECONDS); + } + } + } + + private static double median(double[] values) { + final double[] sorted = values.clone(); + java.util.Arrays.sort(sorted); + return sorted[sorted.length / 2]; + } + + private static void runAllocationComparison(Scenario scenario) throws IOException { + printScenarioHeader(scenario); + if (!JfrByteBufferAllocations.isAvailable()) { + System.out.println(" JFR unavailable; reporting wrap-call count from put ops only."); + } + final AllocationResult direct = benchmarkAllocations(scenario, + ChunkBufferPutBenchmark::loopStreamFillDirect); + final AllocationResult wrap = benchmarkAllocations(scenario, + ChunkBufferPutBenchmark::loopStreamFillWrap); + printAllocationComparison(direct, wrap); + } + + private static void printScenarioHeader(Scenario scenario) { + System.out.println("--- " + scenario.name + " ---"); + System.out.println("Config: " + scenario.config); + System.out.println("Pattern: " + scenario.pattern); + System.out.printf("Chunk=%dKB increment=%dKB write=%dKB%n", + scenario.chunkSize / 1024, + scenario.bufferIncrement / 1024, + scenario.writeSize / 1024); + } + + private static void printThroughputComparison(int writeSize, Result direct, Result wrap, + String prefix) { + final double speedup = wrap.nsPerOp / direct.nsPerOp; + final double pctFaster = (speedup - 1.0) * 100.0; + final double throughputSpeedup = direct.mbPerSec / wrap.mbPerSec; + final double throughputPct = (throughputSpeedup - 1.0) * 100.0; + System.out.printf("%sdirect put(byte[]): %s MB/s | %s ns/op | %.2fs | %s ops%n", + prefix, MBPS.format(direct.mbPerSec), NS.format(direct.nsPerOp), + direct.elapsedSeconds, COUNT.format(direct.ops)); + System.out.printf("%swrap put(ByteBuffer): %s MB/s | %s ns/op | %.2fs | %s ops%n", + prefix, MBPS.format(wrap.mbPerSec), NS.format(wrap.nsPerOp), + wrap.elapsedSeconds, COUNT.format(wrap.ops)); + System.out.printf("%simprovement: %s%% faster (%sx) per %dKB write; " + + "throughput %s%% (%sx)%n", + prefix, PCT.format(pctFaster), RATIO.format(speedup), writeSize / 1024, + PCT.format(throughputPct), RATIO.format(throughputSpeedup)); + } + + private static void printAllocationComparison(AllocationResult direct, + AllocationResult wrap) { + System.out.printf(" direct put(byte[]): %s put ops | %s ByteBuffer TLAB allocs | %s alloc bytes%n", + COUNT.format(direct.putOps), COUNT.format(direct.byteBufferAllocCount), + COUNT.format(direct.byteBufferAllocBytes)); + System.out.printf(" wrap put(ByteBuffer): %s put ops | %s ByteBuffer TLAB allocs | %s alloc bytes%n", + COUNT.format(wrap.putOps), COUNT.format(wrap.byteBufferAllocCount), + COUNT.format(wrap.byteBufferAllocBytes)); + System.out.printf(" ByteBuffer.wrap calls on wrap path (1 per put): %s%n", + COUNT.format(wrap.putOps)); + System.out.printf(" direct path avoids %s ByteBuffer.wrap calls per run%n", + COUNT.format(wrap.putOps)); + if (wrap.byteBufferAllocCount > 0 && direct.byteBufferAllocCount == 0) { + System.out.printf(" JFR confirms zero ByteBuffer TLAB allocations on direct path%n"); + } + if (wrap.byteBufferAllocCount > direct.byteBufferAllocCount) { + final long saved = wrap.byteBufferAllocCount - direct.byteBufferAllocCount; + System.out.printf(" JFR sampled ByteBuffer TLAB allocations avoided on direct path: %s%n", + COUNT.format(saved)); + System.out.println(" (JFR samples TLAB events; put-op count is the exact wrap-call metric)"); + } + } + + private static Result benchmarkThroughput(Scenario scenario, TimedLoop loop) { + final byte[] source = SOURCE.get(); + final int writesPerChunk = scenario.chunkSize / scenario.writeSize; + try (ChunkBuffer buffer = ChunkBuffer.allocate(scenario.chunkSize, + scenario.bufferIncrement)) { + loop.run(buffer, source, scenario.writeSize, writesPerChunk, WARMUP_SECONDS); + final LoopResult benchmark = loop.run(buffer, source, scenario.writeSize, + writesPerChunk, BENCHMARK_SECONDS); + return toResult(benchmark, scenario.writeSize); + } + } + + private static AllocationResult benchmarkAllocations(Scenario scenario, TimedLoop loop) + throws IOException { + final byte[] source = SOURCE.get(); + final int writesPerChunk = scenario.chunkSize / scenario.writeSize; + try (ChunkBuffer buffer = ChunkBuffer.allocate(scenario.chunkSize, + scenario.bufferIncrement)) { + loop.run(buffer, source, scenario.writeSize, writesPerChunk, WARMUP_SECONDS); + final LoopResult[] benchmark = new LoopResult[1]; + final AllocationStats stats = JfrByteBufferAllocations.measure( + () -> benchmark[0] = loop.run(buffer, source, scenario.writeSize, + writesPerChunk, ALLOCATION_BENCHMARK_SECONDS)); + final long putOps = benchmark[0].totalBytes / scenario.writeSize; + return new AllocationResult(putOps, stats.getByteBufferAllocCount(), + stats.getByteBufferAllocBytes()); + } + } + + private static Result toResult(LoopResult benchmark, int writeSize) { + final long elapsedNanos = benchmark.elapsedNanos; + final long benchmarkBytes = benchmark.totalBytes; + final double seconds = elapsedNanos / 1_000_000_000.0; + final double mbPerSec = benchmarkBytes / seconds / (1024.0 * 1024.0); + final long ops = benchmarkBytes / writeSize; + final double nsPerOp = (double) elapsedNanos / ops; + return new Result(mbPerSec, nsPerOp, seconds, ops); + } + + @FunctionalInterface + private interface TimedLoop { + LoopResult run(ChunkBuffer buffer, byte[] source, int writeSize, + int writesPerChunk, int seconds); Review Comment: - Add a `verify` parameter and check for correctness. - Set it to false in benchmark. - Add unit tests by setting it to true. ```diff @@ -246,8 +249,9 @@ LoopResult run(ChunkBuffer buffer, byte[] source, int writeSize, private static LoopResult loopStreamFillDirect(ChunkBuffer buffer, byte[] source, - int writeSize, int writesPerChunk, int seconds) { + int writeSize, int writesPerChunk, int seconds, boolean verify) { long totalBytes = 0; final long start = System.nanoTime(); final long deadline = start + seconds * 1_000_000_000L; @@ -259,10 +263,17 @@ private static LoopResult loopStreamFillDirect(ChunkBuffer buffer, byte[] source off += writeSize; totalBytes += writeSize; } + if (verify) { + assertBuffer(source, buffer); + } } return new LoopResult(totalBytes, System.nanoTime() - start); } + static void assertBuffer(byte[] expected, ChunkBuffer computed) { + assertEquals(ByteBuffer.wrap(expected), computed.toByteString().asReadOnlyByteBuffer()); + } + ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
