This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2b26db0d38f Switch to SplittableRandom in ProducerPerformance utility
(#13482)
2b26db0d38f is described below
commit 2b26db0d38f7245505812e4cb3fa622fc07ba6c8
Author: Robert Young <[email protected]>
AuthorDate: Fri Mar 31 19:52:10 2023 +1300
Switch to SplittableRandom in ProducerPerformance utility (#13482)
Why:
Using java.util.Random to generate every byte sent from the
ProducerPerformance
appears to be a limiting factor. Throughput of the ProducerPerformance
script is
higher with a file of records as compared to randomly generated records.
On my machine a single thread can generate ~100MB/second of uppercase
letters using
java.util.Random and ~300MB/sec using java.util.SplittableRandom. This is a
limit on
throughput.
Note: you can optimise further by expanding it from 26 letters to 32 letter
generated
as it is more efficient to generate a nicely distributed int when the bound
is a
power of two.
Reviewers: Luke Chen <[email protected]>
---
.../src/main/java/org/apache/kafka/tools/ProducerPerformance.java | 7 ++++---
.../test/java/org/apache/kafka/tools/ProducerPerformanceTest.java | 8 ++++----
2 files changed, 8 insertions(+), 7 deletions(-)
diff --git
a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
index d8a0f260691..fd15ddd1b6b 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
@@ -27,8 +27,8 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
-import java.util.Random;
import java.util.Arrays;
+import java.util.SplittableRandom;
import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
import org.apache.kafka.clients.producer.Callback;
@@ -92,7 +92,8 @@ public class ProducerPerformance {
if (recordSize != null) {
payload = new byte[recordSize];
}
- Random random = new Random(0);
+ // not threadsafe, do not share with other threads
+ SplittableRandom random = new SplittableRandom(0);
ProducerRecord<byte[], byte[]> record;
stats = new Stats(numRecords, 5000);
long startMs = System.currentTimeMillis();
@@ -169,7 +170,7 @@ public class ProducerPerformance {
Stats stats;
static byte[] generateRandomPayload(Integer recordSize, List<byte[]>
payloadByteList, byte[] payload,
- Random random) {
+ SplittableRandom random) {
if (!payloadByteList.isEmpty()) {
payload =
payloadByteList.get(random.nextInt(payloadByteList.size()));
} else if (recordSize != null) {
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
index 70717938845..f97e34dda9c 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
@@ -36,7 +36,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
-import java.util.Random;
+import java.util.SplittableRandom;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -177,7 +177,7 @@ public class ProducerPerformanceTest {
List<byte[]> payloadByteList = new ArrayList<>();
payloadByteList.add(byteArray);
byte[] payload = null;
- Random random = new Random(0);
+ SplittableRandom random = new SplittableRandom(0);
payload = ProducerPerformance.generateRandomPayload(recordSize,
payloadByteList, payload, random);
assertEquals(inputString, new String(payload));
@@ -188,7 +188,7 @@ public class ProducerPerformanceTest {
Integer recordSize = 100;
byte[] payload = new byte[recordSize];
List<byte[]> payloadByteList = new ArrayList<>();
- Random random = new Random(0);
+ SplittableRandom random = new SplittableRandom(0);
payload = ProducerPerformance.generateRandomPayload(recordSize,
payloadByteList, payload, random);
for (byte b : payload) {
@@ -201,7 +201,7 @@ public class ProducerPerformanceTest {
Integer recordSize = null;
byte[] payload = null;
List<byte[]> payloadByteList = new ArrayList<>();
- Random random = new Random(0);
+ SplittableRandom random = new SplittableRandom(0);
IllegalArgumentException thrown =
assertThrows(IllegalArgumentException.class, () ->
ProducerPerformance.generateRandomPayload(recordSize, payloadByteList, payload,
random));
assertEquals("no payload File Path or record Size provided",
thrown.getMessage());