This is an automated email from the ASF dual-hosted git repository.
mdedetrich pushed a commit to branch 1.5.x
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/1.5.x by this push:
new 28d86d86d5 chore: Use array list for better performance in
BroadcastHub (#2262)
28d86d86d5 is described below
commit 28d86d86d5a5fb7c4020f953eb4c395b2b3a7e37
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Thu Sep 25 08:50:27 2025 +0800
chore: Use array list for better performance in BroadcastHub (#2262)
* chore: Use array list for better performance in BroadcastHub
* chore: add benchmark
(cherry picked from commit 3cfe37f016ae2397c375fcfd267323f9ef85fa55)
---
.../pekko/stream/BroadcastHubBenchmark.scala | 94 ++++++++++++++++++++++
.../org/apache/pekko/stream/scaladsl/Hub.scala | 29 +++----
2 files changed, 109 insertions(+), 14 deletions(-)
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchmark.scala
new file mode 100644
index 0000000000..ede811c798
--- /dev/null
+++
b/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchmark.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream
+
+import com.typesafe.config.ConfigFactory
+import org.apache.pekko.NotUsed
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.remote.artery.{ BenchTestSource, LatchSink }
+import org.apache.pekko.stream.scaladsl._
+import org.apache.pekko.stream.testkit.scaladsl.StreamTestKit
+import org.openjdk.jmh.annotations._
+
+import java.util.concurrent.{ CountDownLatch, TimeUnit }
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+object BroadcastHubBenchmark {
+ final val OperationsPerInvocation = 100000
+}
+
+@State(Scope.Benchmark)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@BenchmarkMode(Array(Mode.Throughput))
+class BroadcastHubBenchmark {
+ import BroadcastHubBenchmark._
+
+ val config = ConfigFactory.parseString("""
+ pekko.actor.default-dispatcher {
+ executor = "fork-join-executor"
+ fork-join-executor {
+ parallelism-factor = 1
+ }
+ }
+ """)
+
+ implicit val system: ActorSystem = ActorSystem("BroadcastHubBenchmark",
config)
+ import system.dispatcher
+
+ var testSource: Source[java.lang.Integer, NotUsed] = _
+
+ @Param(Array("64", "256"))
+ var parallelism = 0
+
+ @Setup
+ def setup(): Unit = {
+ // eager init of materializer
+ SystemMaterializer(system).materializer
+ testSource = Source.fromGraph(new BenchTestSource(OperationsPerInvocation))
+ }
+
+ @TearDown
+ def shutdown(): Unit = {
+ Await.result(system.terminate(), 5.seconds)
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(OperationsPerInvocation)
+ def broadcast(): Unit = {
+ val latch = new CountDownLatch(parallelism)
+ val broadcastSink =
+ BroadcastHub.sink[java.lang.Integer](bufferSize = parallelism,
startAfterNrOfConsumers = parallelism)
+ val sink = new LatchSink(OperationsPerInvocation, latch)
+ val source = testSource.runWith(broadcastSink)
+ var idx = 0
+ while (idx < parallelism) {
+ source.runWith(sink)
+ idx += 1
+ }
+ awaitLatch(latch)
+ }
+
+ private def awaitLatch(latch: CountDownLatch): Unit = {
+ if (!latch.await(30, TimeUnit.SECONDS)) {
+
StreamTestKit.printDebugDump(SystemMaterializer(system).materializer.supervisor)
+ throw new RuntimeException("Latch didn't complete in time")
+ }
+ }
+
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
index 218c605c23..b997ba74a2 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
@@ -543,7 +543,8 @@ private[pekko] class
BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
* a wakeup and update their position at the same time.
*
*/
- private[this] val consumerWheel = Array.fill[List[Consumer]](bufferSize *
2)(Nil)
+ private[this] val consumerWheel =
+ Array.fill[java.util.ArrayList[Consumer]](bufferSize * 2)(new
util.ArrayList[Consumer]())
private[this] var activeConsumers = 0
override def preStart(): Unit = {
@@ -651,8 +652,10 @@ private[pekko] class
BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
}
// Notify registered consumers
- consumerWheel.iterator.flatMap(_.iterator).foreach { consumer =>
- consumer.callback.invoke(failMessage)
+ var idx = 0
+ while (idx < consumerWheel.length) {
+ consumerWheel(idx).forEach(_.callback.invoke(failMessage))
+ idx += 1
}
failStage(ex)
}
@@ -666,18 +669,16 @@ private[pekko] class
BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
private def findAndRemoveConsumer(id: Long, offset: Int): Consumer = {
// TODO: Try to eliminate modulo division somehow...
val wheelSlot = offset & WheelMask
- var consumersInSlot = consumerWheel(wheelSlot)
- // debug(s"consumers before removal $consumersInSlot")
- var remainingConsumersInSlot: List[Consumer] = Nil
+ val consumersInSlot = consumerWheel(wheelSlot)
var removedConsumer: Consumer = null
-
- while (consumersInSlot.nonEmpty) {
- val consumer = consumersInSlot.head
- if (consumer.id != id) remainingConsumersInSlot = consumer ::
remainingConsumersInSlot
- else removedConsumer = consumer
- consumersInSlot = consumersInSlot.tail
+ if (consumersInSlot.size() > 0) {
+ consumersInSlot.removeIf(consumer => {
+ if (consumer.id == id) {
+ removedConsumer = consumer
+ true
+ } else false
+ })
}
- consumerWheel(wheelSlot) = remainingConsumersInSlot
removedConsumer
}
@@ -708,7 +709,7 @@ private[pekko] class
BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
private def addConsumer(consumer: Consumer, offset: Int): Unit = {
val slot = offset & WheelMask
- consumerWheel(slot) = consumer :: consumerWheel(slot)
+ consumerWheel(slot).add(consumer)
}
/*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]