This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 1d35cccb11 Improve DData remember entities logging and consistency
(#2799)
1d35cccb11 is described below
commit 1d35cccb118d6124b716106e4941193dc9b98e58
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat Mar 28 17:00:36 2026 +0800
Improve DData remember entities logging and consistency (#2799)
Multiple improvements to DData remember entities stores:
- DDataRememberEntitiesShardStore: Retry reads up to 15 times (across
all 5 keys) before giving up, logging at warning level initially and
escalating to error only on final failure. Previously failed immediately
on first GetFailure.
- DDataRememberEntitiesCoordinatorStore: Already retried indefinitely,
but now waits until 5 failures before escalating from warning to error
level logging.
- All log messages now clearly indicate whether they originate from the
coordinator store or shard store (prefix: 'Remember entities
coordinator/shard store').
- Change consistency levels from ReadMajority/WriteMajority to
ReadMajorityPlus/WriteMajorityPlus, using coordinatorStateReadMajorityPlus
and coordinatorStateWriteMajorityPlus settings, with ReadAll/WriteAll
fallback when configured as Int.MaxValue.
Upstream: akka/akka-core@440fdfdcc2
Cherry-picked from akka/akka-core v2.8.0, which is now Apache licensed.
Co-authored-by: Copilot <[email protected]>
---
.../DDataRememberEntitiesCoordinatorStore.scala | 40 +++++++----
.../internal/DDataRememberEntitiesShardStore.scala | 78 ++++++++++++++--------
2 files changed, 79 insertions(+), 39 deletions(-)
diff --git
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/DDataRememberEntitiesCoordinatorStore.scala
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/DDataRememberEntitiesCoordinatorStore.scala
index 5b23f90eba..ad47a66631 100644
---
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/DDataRememberEntitiesCoordinatorStore.scala
+++
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/DDataRememberEntitiesCoordinatorStore.scala
@@ -22,8 +22,10 @@ import pekko.cluster.Cluster
import pekko.cluster.ddata.GSet
import pekko.cluster.ddata.GSetKey
import pekko.cluster.ddata.Replicator
-import pekko.cluster.ddata.Replicator.ReadMajority
-import pekko.cluster.ddata.Replicator.WriteMajority
+import pekko.cluster.ddata.Replicator.ReadAll
+import pekko.cluster.ddata.Replicator.ReadMajorityPlus
+import pekko.cluster.ddata.Replicator.WriteAll
+import pekko.cluster.ddata.Replicator.WriteMajorityPlus
import pekko.cluster.ddata.SelfUniqueAddress
import pekko.cluster.sharding.ClusterShardingSettings
import pekko.cluster.sharding.ShardRegion.ShardId
@@ -52,16 +54,23 @@ private[pekko] final class
DDataRememberEntitiesCoordinatorStore(
implicit val node: Cluster = Cluster(context.system)
implicit val selfUniqueAddress: SelfUniqueAddress =
SelfUniqueAddress(node.selfUniqueAddress)
- private val readMajority =
ReadMajority(settings.tuningParameters.waitingForStateTimeout, majorityMinCap)
- private val writeMajority =
WriteMajority(settings.tuningParameters.updatingStateTimeout, majorityMinCap)
+ private val readConsistency =
settings.tuningParameters.coordinatorStateReadMajorityPlus match {
+ case Int.MaxValue =>
ReadAll(settings.tuningParameters.waitingForStateTimeout)
+ case additional =>
ReadMajorityPlus(settings.tuningParameters.waitingForStateTimeout, additional,
majorityMinCap)
+ }
+ private val writeConsistency =
settings.tuningParameters.coordinatorStateWriteMajorityPlus match {
+ case Int.MaxValue =>
WriteAll(settings.tuningParameters.updatingStateTimeout)
+ case additional =>
WriteMajorityPlus(settings.tuningParameters.updatingStateTimeout, additional,
majorityMinCap)
+ }
private val AllShardsKey = GSetKey[String](s"shard-$typeName-all")
+ private var retryGetCounter = 0
private var allShards: Option[Set[ShardId]] = None
private var coordinatorWaitingForShards: Option[ActorRef] = None
// eager load of remembered shard ids
def getAllShards(): Unit = {
- replicator ! Replicator.Get(AllShardsKey, readMajority)
+ replicator ! Replicator.Get(AllShardsKey, readConsistency)
}
getAllShards()
@@ -84,31 +93,35 @@ private[pekko] final class
DDataRememberEntitiesCoordinatorStore(
onGotAllShards(Set.empty)
case Replicator.GetFailure(AllShardsKey, _) =>
- log.error(
- "The ShardCoordinator was unable to get all shards state within
'waiting-for-state-timeout': {} millis (retrying)",
- readMajority.timeout.toMillis)
+ retryGetCounter += 1
+ val template =
+ "Remember entities coordinator store unable to get initial shards
within 'waiting-for-state-timeout': {} millis (retrying)"
+ if (retryGetCounter < 5)
+ log.warning(template, readConsistency.timeout.toMillis)
+ else
+ log.error(template, readConsistency.timeout.toMillis)
// repeat until GetSuccess
getAllShards()
case RememberEntitiesCoordinatorStore.AddShard(shardId) =>
- replicator ! Replicator.Update(AllShardsKey, GSet.empty[String],
writeMajority, Some((sender(), shardId)))(
+ replicator ! Replicator.Update(AllShardsKey, GSet.empty[String],
writeConsistency, Some((sender(), shardId)))(
_ + shardId)
case Replicator.UpdateSuccess(AllShardsKey, Some((replyTo: ActorRef,
shardId: ShardId))) =>
- log.debug("The coordinator shards state was successfully updated with
{}", shardId)
+ log.debug("Remember entities coordinator store shards successfully
updated with {}", shardId)
replyTo ! RememberEntitiesCoordinatorStore.UpdateDone(shardId)
case Replicator.UpdateTimeout(AllShardsKey, Some((replyTo: ActorRef,
shardId: ShardId))) =>
log.error(
- "The ShardCoordinator was unable to update shards distributed state
within 'updating-state-timeout': {} millis (retrying), adding shard={}",
- writeMajority.timeout.toMillis,
+ "Remember entities coordinator store unable to update shards state
within 'updating-state-timeout': {} millis (retrying), adding shard={}",
+ writeConsistency.timeout.toMillis,
shardId)
replyTo ! RememberEntitiesCoordinatorStore.UpdateFailed(shardId)
case Replicator.ModifyFailure(key, error, cause, Some((replyTo: ActorRef,
shardId: ShardId))) =>
log.error(
cause,
- "The remember entities store was unable to add shard [{}] (key [{}],
failed with error: {})",
+ "Remember entities coordinator store was unable to add shard [{}] (key
[{}], failed with error: {})",
shardId,
key,
error)
@@ -116,6 +129,7 @@ private[pekko] final class
DDataRememberEntitiesCoordinatorStore(
}
def onGotAllShards(shardIds: Set[ShardId]): Unit = {
+ retryGetCounter = 0
coordinatorWaitingForShards match {
case Some(coordinator) =>
coordinator !
RememberEntitiesCoordinatorStore.RememberedShards(shardIds)
diff --git
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala
index afcdaf6c3c..61a488f255 100644
---
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala
+++
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala
@@ -31,13 +31,15 @@ import pekko.cluster.ddata.Replicator.GetFailure
import pekko.cluster.ddata.Replicator.GetSuccess
import pekko.cluster.ddata.Replicator.ModifyFailure
import pekko.cluster.ddata.Replicator.NotFound
-import pekko.cluster.ddata.Replicator.ReadMajority
+import pekko.cluster.ddata.Replicator.ReadAll
+import pekko.cluster.ddata.Replicator.ReadMajorityPlus
import pekko.cluster.ddata.Replicator.StoreFailure
import pekko.cluster.ddata.Replicator.Update
import pekko.cluster.ddata.Replicator.UpdateDataDeleted
import pekko.cluster.ddata.Replicator.UpdateSuccess
import pekko.cluster.ddata.Replicator.UpdateTimeout
-import pekko.cluster.ddata.Replicator.WriteMajority
+import pekko.cluster.ddata.Replicator.WriteAll
+import pekko.cluster.ddata.Replicator.WriteMajorityPlus
import pekko.cluster.ddata.SelfUniqueAddress
import pekko.cluster.sharding.ClusterShardingSettings
import pekko.cluster.sharding.ShardRegion.EntityId
@@ -97,10 +99,18 @@ private[pekko] final class DDataRememberEntitiesShardStore(
implicit val node: Cluster = Cluster(context.system)
implicit val selfUniqueAddress: SelfUniqueAddress =
SelfUniqueAddress(node.selfUniqueAddress)
- private val readMajority =
ReadMajority(settings.tuningParameters.waitingForStateTimeout, majorityMinCap)
+ private val readConsistency =
settings.tuningParameters.coordinatorStateReadMajorityPlus match {
+ case Int.MaxValue =>
ReadAll(settings.tuningParameters.waitingForStateTimeout)
+ case additional =>
ReadMajorityPlus(settings.tuningParameters.waitingForStateTimeout, additional,
majorityMinCap)
+ }
// Note that the timeout is actually updatingStateTimeout / 4 so that we fit
3 retries and a response in the timeout before the shard sees it as a failure
- private val writeMajority =
WriteMajority(settings.tuningParameters.updatingStateTimeout / 4,
majorityMinCap)
+ private val writeConsistency =
settings.tuningParameters.coordinatorStateWriteMajorityPlus match {
+ case Int.MaxValue =>
WriteAll(settings.tuningParameters.updatingStateTimeout / 4)
+ case additional =>
WriteMajorityPlus(settings.tuningParameters.updatingStateTimeout / 4,
additional, majorityMinCap)
+ }
private val maxUpdateAttempts = 3
+ // Note: total for all 5 keys
+ private var maxReadAttemptsLeft = 15
private val keys = stateKeys(typeName, shardId)
if (log.isDebugEnabled) {
@@ -124,7 +134,8 @@ private[pekko] final class DDataRememberEntitiesShardStore(
def idle: Receive = {
case RememberEntitiesShardStore.GetEntities =>
// not supported, but we may get several if the shard timed out and
retried
- log.debug("Another get entities request after responding to one, not
expected/supported, ignoring")
+ log.debug(
+ "Remember entities shard store got another get entities request after
responding to one, not expected/supported, ignoring")
case update: RememberEntitiesShardStore.Update => onUpdate(update)
}
@@ -156,28 +167,39 @@ private[pekko] final class
DDataRememberEntitiesShardStore(
receiveOne(i, ids)
case NotFound(_, Some(i: Int)) =>
receiveOne(i, Set.empty)
- case GetFailure(key, _) =>
- log.error(
- "Unable to get an initial state within 'waiting-for-state-timeout':
[{}] using [{}] (key [{}])",
- readMajority.timeout.pretty,
- readMajority,
- key)
- context.stop(self)
+ case GetFailure(key, Some(i)) =>
+ maxReadAttemptsLeft -= 1
+ if (maxReadAttemptsLeft > 0) {
+ log.warning(
+ "Remember entities shard store unable to get an initial state
within 'waiting-for-state-timeout' for key [{}], retrying",
+ key)
+ replicator ! Get(key, readConsistency, Some(i))
+ } else {
+ log.error(
+ "Remember entities shard store unable to get an initial state
within 'waiting-for-state-timeout' giving up after retrying: [{}] using [{}]
(key [{}])",
+ readConsistency.timeout.pretty,
+ readConsistency,
+ key)
+ context.stop(self)
+ }
case GetDataDeleted(_, _) =>
- log.error("Unable to get an initial state because it was deleted")
+ log.error("Remember entities shard store unable to get an initial
state because it was deleted")
context.stop(self)
case update: RememberEntitiesShardStore.Update =>
- log.warning("Got an update before load of initial entities completed,
dropping update: [{}]", update)
+ log.warning(
+ "Remember entities shard store got an update before load of initial
entities completed, dropping update: [{}]",
+ update)
case RememberEntitiesShardStore.GetEntities =>
if (gotKeys.size == numberOfKeys) {
// we already got all and was waiting for a request
- log.debug("Got request from shard, sending remembered entities")
+ log.debug("Remember entities shard store got request from shard,
sending remembered entities")
sender() ! RememberEntitiesShardStore.RememberedEntities(ids)
context.become(idle)
unstashAll()
} else {
// we haven't seen all ids yet
- log.debug("Got request from shard, waiting for all remembered
entities to arrive")
+ log.debug(
+ "Remember entities shard store got request from shard, waiting for
all remembered entities to arrive")
context.become(waitingForAllEntityIds(gotKeys, ids, Some(sender())))
}
case _ =>
@@ -194,7 +216,7 @@ private[pekko] final class DDataRememberEntitiesShardStore(
allEvts.groupBy(evt => key(evt.id)).map {
case (key, evts) =>
(evts,
- (Update(key, ORSet.empty[EntityId], writeMajority, Some(evts)) {
existing =>
+ (Update(key, ORSet.empty[EntityId], writeConsistency, Some(evts))
{ existing =>
evts.foldLeft(existing) {
case (acc, Started(id)) => acc :+ id
case (acc, Stopped(id)) => acc.remove(id)
@@ -218,7 +240,7 @@ private[pekko] final class DDataRememberEntitiesShardStore(
// updatesLeft used both to keep track of what work remains and for
retrying on timeout up to a limit
def next(updatesLeft: Map[Set[Evt], (Update[ORSet[EntityId]], Int)]):
Receive = {
case UpdateSuccess(_, Some(evts: Set[Evt] @unchecked)) =>
- log.debug("The DDataShard state was successfully updated for [{}]",
evts)
+ log.debug("Remember entities shard store state was successfully
updated for [{}]", evts)
val remainingAfterThis = updatesLeft - evts
if (remainingAfterThis.isEmpty) {
requestor ! RememberEntitiesShardStore.UpdateDone(update.started,
update.stopped)
@@ -230,31 +252,35 @@ private[pekko] final class
DDataRememberEntitiesShardStore(
case UpdateTimeout(_, Some(evts: Set[Evt] @unchecked)) =>
val (updateForEvts, retriesLeft) = updatesLeft(evts)
if (retriesLeft > 0) {
- log.debug("Retrying update because of write timeout, tries left
[{}]", retriesLeft)
+ log.debug(
+ "Remember entities shard store retrying update because of write
timeout, tries left [{}]",
+ retriesLeft)
replicator ! updateForEvts
context.become(next(updatesLeft.updated(evts, (updateForEvts,
retriesLeft - 1))))
} else {
log.error(
- "Unable to update state, within 'updating-state-timeout'= [{}],
gave up after [{}] retries",
- writeMajority.timeout.pretty,
+ "Remember entities shard store unable to update state, within
'updating-state-timeout'= [{}], gave up after [{}] retries",
+ writeConsistency.timeout.pretty,
maxUpdateAttempts)
// will trigger shard restart
context.stop(self)
}
case StoreFailure(_, _) =>
- log.error("Unable to update state, due to store failure")
+ log.error("Remember entities shard store unable to update state, due
to store failure")
// will trigger shard restart
context.stop(self)
case ModifyFailure(_, error, cause, _) =>
- log.error(cause, "Unable to update state, due to modify failure: {}",
error)
+ log.error(cause, "Remember entities shard store unable to update
state, due to modify failure: {}", error)
// will trigger shard restart
context.stop(self)
case UpdateDataDeleted(_, _) =>
- log.error("Unable to update state, due to delete")
+ log.error("Remember entities shard store unable to update state, due
to delete")
// will trigger shard restart
context.stop(self)
case update: RememberEntitiesShardStore.Update =>
- log.warning("Got a new update before write of previous completed,
dropping update: [{}]", update)
+ log.warning(
+ "Remember entities shard store got a new update before write of
previous completed, dropping update: [{}]",
+ update)
}
next(allUpdates)
@@ -263,7 +289,7 @@ private[pekko] final class DDataRememberEntitiesShardStore(
private def loadAllEntities(): Unit = {
(0 until numberOfKeys).toSet[Int].foreach { i =>
val key = keys(i)
- replicator ! Get(key, readMajority, Some(i))
+ replicator ! Get(key, readConsistency, Some(i))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]