This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 195cba7187 RememberEntity throttling per region instead of per shard 
(#2786)
195cba7187 is described below

commit 195cba71876eca63f93b042ccb5cb01a37ca5c28
Author: PJ Fanning <[email protected]>
AuthorDate: Fri Mar 27 19:30:36 2026 +0100

    RememberEntity throttling per region instead of per shard (#2786)
    
    * Initial plan
    
    * Port akka-core PR #31836: RememberEntityStarter throttling per region 
instead of per shard
    
    Co-authored-by: pjfanning <[email protected]>
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko/sessions/ef8aaa16-c225-425c-b177-45f57c22b143
    
    * scalafmt
    
    * scalafmt
    
    * Update reference.conf
    
    * Port akka-core#31836: fix three structural gaps in RememberEntity 
per-region throttling (#27)
    
    * Initial plan
    
    * Fix three gaps vs akka-core v2.8.0: Shard constructor, ShardRegion 
manager, and test improvement
    
    Co-authored-by: pjfanning <[email protected]>
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko/sessions/47b8e4cd-f03b-409d-9a54-d0e49bed96ae
    
    * Update ShardRegion.scala
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
    Co-authored-by: PJ Fanning <[email protected]>
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 cluster-sharding/src/main/resources/reference.conf |   3 +-
 .../org/apache/pekko/cluster/sharding/Shard.scala  |  16 +--
 .../pekko/cluster/sharding/ShardRegion.scala       |  12 +-
 .../sharding/internal/RememberEntityStarter.scala  |  95 +++++++++++++---
 .../internal/RememberEntitiesStarterSpec.scala     | 124 ++++++++++++++++++++-
 5 files changed, 221 insertions(+), 29 deletions(-)

diff --git a/cluster-sharding/src/main/resources/reference.conf 
b/cluster-sharding/src/main/resources/reference.conf
index b21f7f2fe6..d079825d10 100644
--- a/cluster-sharding/src/main/resources/reference.conf
+++ b/cluster-sharding/src/main/resources/reference.conf
@@ -343,9 +343,10 @@ pekko.cluster.sharding {
   # entity actors at a fix rate. The default strategy "all".
   entity-recovery-strategy = "all"
 
-  # Default settings for the constant rate entity recovery strategy
+  # Default settings for the constant rate entity recovery strategy.
   entity-recovery-constant-rate-strategy {
     # Sets the frequency at which a batch of entity actors is started.
+    # The frequency is per sharding region (entity type).
     frequency = 100 ms
     # Sets the number of entity actors to be restart at a particular interval
     number-of-entities = 5
diff --git 
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/Shard.scala 
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/Shard.scala
index 7c018326b9..2d4b1165db 100644
--- 
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/Shard.scala
+++ 
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/Shard.scala
@@ -17,7 +17,6 @@ import java.net.URLEncoder
 import java.util
 
 import scala.annotation.nowarn
-import scala.collection.immutable.Set
 import scala.concurrent.duration._
 
 import org.apache.pekko
@@ -43,7 +42,7 @@ import 
pekko.cluster.sharding.internal.EntityPassivationStrategy
 import pekko.cluster.sharding.internal.RememberEntitiesProvider
 import pekko.cluster.sharding.internal.RememberEntitiesShardStore
 import pekko.cluster.sharding.internal.RememberEntitiesShardStore.GetEntities
-import pekko.cluster.sharding.internal.RememberEntityStarter
+import pekko.cluster.sharding.internal.RememberEntityStarterManager
 import pekko.coordination.lease.scaladsl.Lease
 import pekko.coordination.lease.scaladsl.LeaseProvider
 import pekko.event.LoggingAdapter
@@ -106,7 +105,8 @@ private[pekko] object Shard {
       extractEntityId: ShardRegion.ExtractEntityId,
       extractShardId: ShardRegion.ExtractShardId,
       handOffStopMessage: Any,
-      rememberEntitiesProvider: Option[RememberEntitiesProvider]): Props =
+      rememberEntitiesProvider: Option[RememberEntitiesProvider],
+      rememberEntityStarterManager: ActorRef): Props =
     Props(
       new Shard(
         typeName,
@@ -116,7 +116,8 @@ private[pekko] object Shard {
         extractEntityId,
         extractShardId,
         handOffStopMessage,
-        rememberEntitiesProvider)).withDeploy(Deploy.local)
+        rememberEntitiesProvider,
+        rememberEntityStarterManager)).withDeploy(Deploy.local)
 
   case object PassivateIntervalTick extends NoSerializationVerificationNeeded
 
@@ -428,7 +429,8 @@ private[pekko] class Shard(
     extractEntityId: ShardRegion.ExtractEntityId,
     @nowarn("msg=never used") extractShardId: ShardRegion.ExtractShardId,
     handOffStopMessage: Any,
-    rememberEntitiesProvider: Option[RememberEntitiesProvider])
+    rememberEntitiesProvider: Option[RememberEntitiesProvider],
+    rememberEntityStarterManager: ActorRef)
     extends Actor
     with ActorLogging
     with Stash
@@ -604,9 +606,7 @@ private[pekko] class Shard(
     if (ids.nonEmpty) {
       entities.alreadyRemembered(ids)
       log.debug("{}: Restarting set of [{}] entities", typeName, ids.size)
-      context.actorOf(
-        RememberEntityStarter.props(context.parent, self, shardId, ids, 
settings),
-        "RememberEntitiesStarter")
+      rememberEntityStarterManager ! 
RememberEntityStarterManager.StartEntities(self, shardId, ids)
     }
     shardInitialized()
   }
diff --git 
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardRegion.scala
 
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardRegion.scala
index b3a5cce4de..a6aca98d88 100644
--- 
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardRegion.scala
+++ 
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardRegion.scala
@@ -37,6 +37,7 @@ import pekko.cluster.MemberStatus
 import pekko.cluster.sharding.ClusterShardingSettings.PassivationStrategy
 import pekko.cluster.sharding.Shard.ShardStats
 import pekko.cluster.sharding.internal.RememberEntitiesProvider
+import pekko.cluster.sharding.internal.RememberEntityStarterManager
 import pekko.event.Logging
 import pekko.pattern.ask
 import pekko.pattern.pipe
@@ -673,6 +674,14 @@ private[pekko] class ShardRegion(
       }
     }
 
+  // When rememberEntities is enabled, create a manager to throttle entity 
starting across
+  // all shards in this region (per entity type) rather than per shard
+  private val rememberEntityStarterManager: ActorRef =
+    if (rememberEntitiesProvider.isDefined)
+      context.actorOf(RememberEntityStarterManager.props(context.self, 
settings), "RememberEntityStarter")
+    else
+      context.system.deadLetters
+
   // subscribe to MemberEvent, re-subscribe when restart
   override def preStart(): Unit = {
     cluster.subscribe(self, classOf[MemberEvent])
@@ -1349,7 +1358,8 @@ private[pekko] class ShardRegion(
                     extractEntityId,
                     extractShardId,
                     handOffStopMessage,
-                    rememberEntitiesProvider)
+                    rememberEntitiesProvider,
+                    rememberEntityStarterManager)
                   .withDispatcher(context.props.dispatcher),
                 name))
             shardsByRef = shardsByRef.updated(shard, id)
diff --git 
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/RememberEntityStarter.scala
 
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/RememberEntityStarter.scala
index 984b82f970..88d4a79729 100644
--- 
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/RememberEntityStarter.scala
+++ 
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/RememberEntityStarter.scala
@@ -22,6 +22,7 @@ import pekko.actor.ActorLogging
 import pekko.actor.ActorRef
 import pekko.actor.NoSerializationVerificationNeeded
 import pekko.actor.Props
+import pekko.actor.Terminated
 import pekko.actor.Timers
 import pekko.annotation.InternalApi
 import pekko.cluster.sharding.ClusterShardingSettings
@@ -30,6 +31,70 @@ import pekko.cluster.sharding.ShardRegion
 import pekko.cluster.sharding.ShardRegion.EntityId
 import pekko.cluster.sharding.ShardRegion.ShardId
 
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] object RememberEntityStarterManager {
+  def props(region: ActorRef, settings: ClusterShardingSettings) =
+    Props(new RememberEntityStarterManager(region, settings))
+
+  final case class StartEntities(shard: ActorRef, shardId: 
ShardRegion.ShardId, ids: Set[ShardRegion.EntityId])
+      extends NoSerializationVerificationNeeded
+
+  private case object ContinueAfterDelay extends 
NoSerializationVerificationNeeded
+}
+
+/**
+ * INTERNAL API: Per-region throttler for starting remembered entities, 
ensuring the
+ * constant-rate strategy throttles across all shards in a region rather than 
per shard.
+ */
+@InternalApi
+private[pekko] final class RememberEntityStarterManager(region: ActorRef, 
settings: ClusterShardingSettings)
+    extends Actor
+    with ActorLogging
+    with Timers {
+  import RememberEntityStarterManager._
+
+  private val delay = 
settings.tuningParameters.entityRecoveryConstantRateStrategyFrequency
+
+  override def receive: Receive = 
settings.tuningParameters.entityRecoveryStrategy match {
+    case "all"      => allStrategy
+    case "constant" => constantStrategyIdle
+    case other      => throw new IllegalArgumentException(s"Unknown 
entityRecoveryStrategy [$other]")
+  }
+
+  private val allStrategy: Receive = {
+    case s: StartEntities => start(s, isConstantStrategy = false)
+    case _: Terminated    => // RememberEntityStarter was done
+  }
+
+  private val constantStrategyIdle: Receive = {
+    case s: StartEntities =>
+      start(s, isConstantStrategy = true)
+      context.become(constantStrategyWaiting(Vector.empty))
+  }
+
+  private def constantStrategyWaiting(workQueue: Vector[StartEntities]): 
Receive = {
+    case s: StartEntities => context.become(constantStrategyWaiting(workQueue 
:+ s))
+
+    case _: Terminated => // RememberEntityStarter was done
+      timers.startSingleTimer(ContinueAfterDelay, ContinueAfterDelay, delay)
+
+    case ContinueAfterDelay =>
+      if (workQueue.isEmpty) context.become(constantStrategyIdle)
+      else {
+        start(workQueue.head, isConstantStrategy = true)
+        context.become(constantStrategyWaiting(workQueue.tail))
+      }
+  }
+
+  private def start(s: StartEntities, isConstantStrategy: Boolean): Unit = {
+    context.watch(
+      context.actorOf(RememberEntityStarter.props(region, s.shard, s.shardId, 
s.ids, isConstantStrategy, settings)))
+  }
+}
+
 /**
  * INTERNAL API
  */
@@ -40,8 +105,9 @@ private[pekko] object RememberEntityStarter {
       shard: ActorRef,
       shardId: ShardRegion.ShardId,
       ids: Set[ShardRegion.EntityId],
+      isConstantStrategy: Boolean,
       settings: ClusterShardingSettings) =
-    Props(new RememberEntityStarter(region, shard, shardId, ids, settings))
+    Props(new RememberEntityStarter(region, shard, shardId, ids, 
isConstantStrategy, settings))
 
   private final case class StartBatch(batchSize: Int) extends 
NoSerializationVerificationNeeded
   private case object ResendUnAcked extends NoSerializationVerificationNeeded
@@ -56,6 +122,7 @@ private[pekko] final class RememberEntityStarter(
     shard: ActorRef,
     shardId: ShardRegion.ShardId,
     ids: Set[ShardRegion.EntityId],
+    constantStrategy: Boolean,
     settings: ClusterShardingSettings)
     extends Actor
     with ActorLogging
@@ -71,22 +138,22 @@ private[pekko] final class RememberEntityStarter(
   private var entitiesMoved = Set.empty[EntityId]
 
   log.debug(
-    "Shard starting [{}] remembered entities using strategy [{}]",
+    "Shard [{}] starting [{}] remembered entities using strategy [{}]",
+    shardId,
     ids.size,
     settings.tuningParameters.entityRecoveryStrategy)
 
-  settings.tuningParameters.entityRecoveryStrategy match {
-    case "all" =>
-      idsLeftToStart = Set.empty
-      startBatch(ids)
-    case "constant" =>
-      import settings.tuningParameters
-      idsLeftToStart = ids
-      timers.startTimerWithFixedDelay(
-        "constant",
-        
StartBatch(tuningParameters.entityRecoveryConstantRateStrategyNumberOfEntities),
-        tuningParameters.entityRecoveryConstantRateStrategyFrequency)
-      
startBatch(tuningParameters.entityRecoveryConstantRateStrategyNumberOfEntities)
+  if (constantStrategy) {
+    import settings.tuningParameters
+    idsLeftToStart = ids
+    timers.startTimerWithFixedDelay(
+      "constant",
+      
StartBatch(tuningParameters.entityRecoveryConstantRateStrategyNumberOfEntities),
+      tuningParameters.entityRecoveryConstantRateStrategyFrequency)
+    
startBatch(tuningParameters.entityRecoveryConstantRateStrategyNumberOfEntities)
+  } else {
+    idsLeftToStart = Set.empty
+    startBatch(ids)
   }
   timers.startTimerWithFixedDelay("retry", ResendUnAcked, 
settings.tuningParameters.retryInterval)
 
diff --git 
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/internal/RememberEntitiesStarterSpec.scala
 
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/internal/RememberEntitiesStarterSpec.scala
index 18921dcfb4..99b9ab40d6 100644
--- 
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/internal/RememberEntitiesStarterSpec.scala
+++ 
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/internal/RememberEntitiesStarterSpec.scala
@@ -43,7 +43,13 @@ class RememberEntitiesStarterSpec extends PekkoSpec {
       val defaultSettings = ClusterShardingSettings(system)
 
       val rememberEntityStarter = system.actorOf(
-        RememberEntityStarter.props(regionProbe.ref, shardProbe.ref, shardId, 
Set("1", "2", "3"), defaultSettings))
+        RememberEntityStarter.props(
+          regionProbe.ref,
+          shardProbe.ref,
+          shardId,
+          Set("1", "2", "3"),
+          isConstantStrategy = false,
+          defaultSettings))
 
       watch(rememberEntityStarter)
       val startedEntityIds = (1 to 3).map { _ =>
@@ -74,7 +80,13 @@ class RememberEntitiesStarterSpec extends PekkoSpec {
           
.withFallback(system.settings.config.getConfig("pekko.cluster.sharding")))
 
       val rememberEntityStarter = system.actorOf(
-        RememberEntityStarter.props(regionProbe.ref, shardProbe.ref, shardId, 
Set("1", "2", "3"), customSettings))
+        RememberEntityStarter.props(
+          regionProbe.ref,
+          shardProbe.ref,
+          shardId,
+          Set("1", "2", "3"),
+          isConstantStrategy = false,
+          customSettings))
 
       watch(rememberEntityStarter)
       (1 to 3).foreach { _ =>
@@ -107,7 +119,13 @@ class RememberEntitiesStarterSpec extends PekkoSpec {
           
.withFallback(system.settings.config.getConfig("pekko.cluster.sharding")))
 
       val rememberEntityStarter = system.actorOf(
-        RememberEntityStarter.props(regionProbe.ref, shardProbe.ref, shardId, 
Set("1", "2", "3"), customSettings))
+        RememberEntityStarter.props(
+          regionProbe.ref,
+          shardProbe.ref,
+          shardId,
+          Set("1", "2", "3"),
+          isConstantStrategy = false,
+          customSettings))
 
       watch(rememberEntityStarter)
       val start1 = regionProbe.expectMsgType[ShardRegion.StartEntity]
@@ -143,8 +161,13 @@ class RememberEntitiesStarterSpec extends PekkoSpec {
           
.withFallback(system.settings.config.getConfig("pekko.cluster.sharding")))
 
       val rememberEntityStarter = system.actorOf(
-        RememberEntityStarter
-          .props(regionProbe.ref, shardProbe.ref, shardId, Set("1", "2", "3", 
"4", "5"), customSettings))
+        RememberEntityStarter.props(
+          regionProbe.ref,
+          shardProbe.ref,
+          shardId,
+          Set("1", "2", "3", "4", "5"),
+          isConstantStrategy = true,
+          customSettings))
 
       def recieveStartAndAck() = {
         val start = regionProbe.expectMsgType[ShardRegion.StartEntity]
@@ -173,4 +196,95 @@ class RememberEntitiesStarterSpec extends PekkoSpec {
     }
 
   }
+
+  "The RememberEntityStarterManager" must {
+    "start entities for all shards immediately with entity-recovery-strategy = 
all (default)" in {
+      val regionProbe = TestProbe()
+      val shardProbe1 = TestProbe()
+      val shardProbe2 = TestProbe()
+      val shardId1 = nextShardId()
+      val shardId2 = nextShardId()
+
+      val defaultSettings = ClusterShardingSettings(system)
+
+      val manager = 
system.actorOf(RememberEntityStarterManager.props(regionProbe.ref, 
defaultSettings))
+
+      manager ! RememberEntityStarterManager.StartEntities(shardProbe1.ref, 
shardId1, Set("1", "2"))
+      manager ! RememberEntityStarterManager.StartEntities(shardProbe2.ref, 
shardId2, Set("3", "4"))
+
+      // both shards should be started immediately (all strategy, no queuing)
+      val startedEntityIds = (1 to 4).map { _ =>
+        val start = regionProbe.expectMsgType[ShardRegion.StartEntity]
+        regionProbe.lastSender ! ShardRegion.StartEntityAck(start.entityId,
+          start.entityId match {
+            case "1" | "2" => shardId1
+            case _         => shardId2
+          })
+        start.entityId
+      }.toSet
+      startedEntityIds should ===(Set("1", "2", "3", "4"))
+    }
+
+    "throttle entity starting across shards with entity-recovery-strategy = 
constant" in {
+      val regionProbe = TestProbe()
+      val shard1Probe = TestProbe()
+      val shard2Probe = TestProbe()
+      val shardId1 = nextShardId()
+      val shardId2 = nextShardId()
+
+      val customSettings = ClusterShardingSettings(
+        ConfigFactory
+          .parseString(
+            """
+             entity-recovery-strategy = constant
+             entity-recovery-constant-rate-strategy {
+               frequency = 2 s
+               number-of-entities = 2
+             }
+             retry-interval = 1 second
+            """)
+          
.withFallback(system.settings.config.getConfig("pekko.cluster.sharding")))
+
+      val manager = 
system.actorOf(RememberEntityStarterManager.props(regionProbe.ref, 
customSettings))
+
+      manager ! RememberEntityStarterManager.StartEntities(shard1Probe.ref, 
shardId1, Set("1", "2", "3", "4", "5"))
+      manager ! RememberEntityStarterManager.StartEntities(shard2Probe.ref, 
shardId2, Set("6", "7", "8"))
+
+      import pekko.cluster.sharding.ShardRegion.EntityId
+
+      def receiveStartAndAck(): EntityId = {
+        val start = regionProbe.expectMsgType[ShardRegion.StartEntity]
+        val shardId = if (start.entityId.toInt <= 5) shardId1 else shardId2
+        regionProbe.lastSender ! ShardRegion.StartEntityAck(start.entityId, 
shardId)
+        start.entityId
+      }
+
+      var startedEntityIds = Set.empty[EntityId]
+
+      // first batch for shard1 should be immediate
+      startedEntityIds += receiveStartAndAck()
+      startedEntityIds += receiveStartAndAck()
+
+      // second batch holding off
+      regionProbe.expectNoMessage(600.millis)
+      startedEntityIds += receiveStartAndAck()
+      startedEntityIds += receiveStartAndAck()
+
+      // third batch holding off
+      regionProbe.expectNoMessage(600.millis)
+      startedEntityIds += receiveStartAndAck()
+
+      startedEntityIds should ===(Set("1", "2", "3", "4", "5"))
+
+      // now the second StartEntities for shard2 — still throttled after delay
+      regionProbe.expectNoMessage(600.millis)
+      startedEntityIds += receiveStartAndAck()
+      startedEntityIds += receiveStartAndAck()
+
+      regionProbe.expectNoMessage(600.millis)
+      startedEntityIds += receiveStartAndAck()
+
+      startedEntityIds should ===(Set("1", "2", "3", "4", "5", "6", "7", "8"))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to