This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 550ea4bde5 Disable ClusterShardingHealthCheck after configured
duration post member-up (#2785)
550ea4bde5 is described below
commit 550ea4bde5d95d55489aab899feff92db6d199d7
Author: PJ Fanning <[email protected]>
AuthorDate: Tue Mar 24 08:59:40 2026 +0100
Disable ClusterShardingHealthCheck after configured duration post member-up
(#2785)
* Initial plan
* Copy akka/akka-core#31864: Disable ClusterShardingHealthCheck after
duration when member is up
Co-authored-by: pjfanning <[email protected]>
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko/sessions/bc72a2b0-c6d6-4bc2-983d-5d488ff934f4
* Update ClusterShardingHealthCheck.scala
* Update ClusterShardingHealthCheck.scala
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
cluster-sharding/src/main/resources/reference.conf | 11 ++++
.../sharding/ClusterShardingHealthCheck.scala | 33 +++++++++--
.../sharding/ClusterShardingHealthCheckSpec.scala | 64 +++++++++++++++++++---
docs/src/main/paradox/typed/cluster-sharding.md | 2 +
4 files changed, 95 insertions(+), 15 deletions(-)
diff --git a/cluster-sharding/src/main/resources/reference.conf
b/cluster-sharding/src/main/resources/reference.conf
index 5282a7122e..b21f7f2fe6 100644
--- a/cluster-sharding/src/main/resources/reference.conf
+++ b/cluster-sharding/src/main/resources/reference.conf
@@ -444,6 +444,17 @@ pekko.cluster.sharding {
# Timeout for the local shard region to respond. This should be lower than
your monitoring system's
# timeout for health checks
timeout = 5s
+
+ # The health check is only performed during this duration after
+ # the member is up. After that the sharding check will not be performed
(always returns success).
+ # The purpose is to wait for Cluster Sharding registration to complete on
initial startup.
+ # After that, in case of Sharding Coordinator movement or reachability we
still want to be ready
+ # because requests can typically be served without involving the
coordinator.
+ # Another reason is that when a new entity type is added in a rolling
update we don't want to fail
+ # the ready check forever, which would stall the rolling update. Sharding
Coordinator is expected
+ # run on the oldest member, but in this scenario that is in the old
deployment hasn't started the
+ # coordinator for that entity type.
+ disabled-after = 10s
}
}
# //#sharding-ext-config
diff --git
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterShardingHealthCheck.scala
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterShardingHealthCheck.scala
index 7b444887ca..2f1d75ea74 100644
---
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterShardingHealthCheck.scala
+++
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterShardingHealthCheck.scala
@@ -15,15 +15,16 @@ package org.apache.pekko.cluster.sharding
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
-import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.duration.{ DurationInt, FiniteDuration }
import scala.jdk.CollectionConverters._
import scala.jdk.DurationConverters._
import org.apache.pekko
import pekko.actor.ActorRef
import pekko.actor.ActorSystem
-import pekko.annotation.ApiMayChange
import pekko.annotation.InternalApi
+import pekko.cluster.Cluster
+import pekko.cluster.MemberStatus
import pekko.event.Logging
import pekko.pattern.AskTimeoutException
import pekko.pattern.ask
@@ -39,11 +40,19 @@ private[pekko] object ClusterShardingHealthCheckSettings {
def apply(config: Config): ClusterShardingHealthCheckSettings =
new ClusterShardingHealthCheckSettings(
config.getStringList("names").asScala.toSet,
- config.getDuration("timeout").toScala)
+ config.getDuration("timeout").toScala,
+ config.getDuration("disabled-after").toScala)
}
-@ApiMayChange
-final class ClusterShardingHealthCheckSettings(val names: Set[String], val
timeout: FiniteDuration)
+final class ClusterShardingHealthCheckSettings(
+ val names: Set[String],
+ val timeout: FiniteDuration,
+ val disableAfter: FiniteDuration) {
+
+ // for binary backwards compatibility
+ @deprecated("Use full constructor", "2.0.0")
+ def this(names: Set[String], timeout: FiniteDuration) = this(names, timeout,
10.seconds)
+}
private object ClusterShardingHealthCheck {
val Success = Future.successful(true)
@@ -52,7 +61,6 @@ private object ClusterShardingHealthCheck {
/**
* INTERNAL API (ctr)
*/
-@ApiMayChange
final class ClusterShardingHealthCheck private[pekko] (
system: ActorSystem,
settings: ClusterShardingHealthCheckSettings,
@@ -72,11 +80,24 @@ final class ClusterShardingHealthCheck private[pekko] (
// Once the check has passed it always does
@volatile private var registered = false
+ @volatile private var startedTimestamp = 0L
+
+ private def isMemberUp(): Boolean = {
+ val memberStatus = Cluster(system).selfMember.status
+ memberStatus != MemberStatus.Joining && memberStatus !=
MemberStatus.Removed
+ }
override def apply(): Future[Boolean] = {
if (settings.names.isEmpty || registered) {
ClusterShardingHealthCheck.Success
+ } else if (startedTimestamp != 0L &&
+ System
+ .currentTimeMillis() > startedTimestamp +
settings.disableAfter.toMillis) {
+ ClusterShardingHealthCheck.Success
} else {
+ if (startedTimestamp == 0 && isMemberUp())
+ startedTimestamp = System.currentTimeMillis()
+
Future
.traverse(settings.names) { name =>
shardRegion(name) // this can throw if shard region not registered
and it'll fail the check
diff --git
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/ClusterShardingHealthCheckSpec.scala
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/ClusterShardingHealthCheckSpec.scala
index 56cf7aca0e..99132427da 100644
---
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/ClusterShardingHealthCheckSpec.scala
+++
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/ClusterShardingHealthCheckSpec.scala
@@ -16,6 +16,8 @@ package org.apache.pekko.cluster.sharding
import scala.concurrent.duration._
import org.apache.pekko
+import pekko.cluster.Cluster
+import pekko.cluster.MemberStatus
import pekko.testkit.PekkoSpec
import pekko.testkit.TestProbe
import pekko.testkit.WithLogCapturing
@@ -26,9 +28,11 @@ import com.typesafe.config.ConfigFactory
object ClusterShardingHealthCheckSpec {
val config = ConfigFactory.parseString("""
- pekko.loglevel = DEBUG
- pekko.loggers =
["org.apache.pekko.testkit.SilenceAllTestEventListener"]
- """.stripMargin)
+ pekko.loglevel = DEBUG
+ pekko.loggers = ["org.apache.pekko.testkit.SilenceAllTestEventListener"]
+ pekko.actor.provider = cluster
+ pekko.remote.artery.canonical.port = 0
+ """)
}
class ClusterShardingHealthCheckSpec
@@ -41,7 +45,7 @@ class ClusterShardingHealthCheckSpec
val shardRegionProbe = TestProbe()
val check = new ClusterShardingHealthCheck(
system,
- new ClusterShardingHealthCheckSettings(Set.empty, 1.second),
+ new ClusterShardingHealthCheckSettings(Set.empty, 1.second,
10.seconds),
_ => shardRegionProbe.ref)
check().futureValue shouldEqual true
}
@@ -49,7 +53,7 @@ class ClusterShardingHealthCheckSpec
val shardRegionProbe = TestProbe()
val check = new ClusterShardingHealthCheck(
system,
- new ClusterShardingHealthCheckSettings(Set("cat"), 1.second),
+ new ClusterShardingHealthCheckSettings(Set("cat"), 1.second,
10.seconds),
_ => shardRegionProbe.ref)
val response = check()
shardRegionProbe.expectMsg(ShardRegion.GetShardRegionStatus)
@@ -60,7 +64,7 @@ class ClusterShardingHealthCheckSpec
val shardRegionProbe = TestProbe()
val check = new ClusterShardingHealthCheck(
system,
- new ClusterShardingHealthCheckSettings(Set("cat"), 1.second),
+ new ClusterShardingHealthCheckSettings(Set("cat"), 1.second,
10.seconds),
_ => shardRegionProbe.ref)
val response = check()
shardRegionProbe.expectMsg(ShardRegion.GetShardRegionStatus)
@@ -71,7 +75,7 @@ class ClusterShardingHealthCheckSpec
val shardRegionProbe = TestProbe()
val check = new ClusterShardingHealthCheck(
system,
- new ClusterShardingHealthCheckSettings(Set("cat", "dog"), 1.second),
+ new ClusterShardingHealthCheckSettings(Set("cat", "dog"), 1.second,
10.seconds),
_ => shardRegionProbe.ref)
val response = check()
shardRegionProbe.expectMsg(ShardRegion.GetShardRegionStatus)
@@ -84,7 +88,7 @@ class ClusterShardingHealthCheckSpec
val shardRegionProbe = TestProbe()
val check = new ClusterShardingHealthCheck(
system,
- new ClusterShardingHealthCheckSettings(Set("cat"), 100.millis),
+ new ClusterShardingHealthCheckSettings(Set("cat"), 100.millis,
10.seconds),
_ => shardRegionProbe.ref)
val response = check()
shardRegionProbe.expectMsg(ShardRegion.GetShardRegionStatus)
@@ -95,7 +99,7 @@ class ClusterShardingHealthCheckSpec
val shardRegionProbe = TestProbe()
val check = new ClusterShardingHealthCheck(
system,
- new ClusterShardingHealthCheckSettings(Set("cat"), 1.second),
+ new ClusterShardingHealthCheckSettings(Set("cat"), 1.second,
10.seconds),
_ => shardRegionProbe.ref)
val response = check()
shardRegionProbe.expectMsg(ShardRegion.GetShardRegionStatus)
@@ -106,6 +110,48 @@ class ClusterShardingHealthCheckSpec
shardRegionProbe.expectNoMessage()
secondResponse.futureValue shouldEqual true
}
+
+ "always pass after disabled-after" in {
+ val shardRegionProbe = TestProbe()
+ val disabledAfter = 100.millis
+ val check = new ClusterShardingHealthCheck(
+ system,
+ new ClusterShardingHealthCheckSettings(Set("cat"), 1.second,
disabledAfter),
+ _ => shardRegionProbe.ref)
+ // first check will always be performed
+ val response1 = check()
+ shardRegionProbe.expectMsg(ShardRegion.GetShardRegionStatus)
+ shardRegionProbe.reply(new ShardRegion.ShardRegionStatus("cat", false))
+ response1.futureValue shouldEqual false
+
+ Thread.sleep(disabledAfter.toMillis + 100)
+
+ // and it will not start the clock until member up
+ val response2 = check()
+ shardRegionProbe.expectMsg(ShardRegion.GetShardRegionStatus)
+ shardRegionProbe.reply(new ShardRegion.ShardRegionStatus("cat", false))
+ response2.futureValue shouldEqual false
+
+ Thread.sleep(disabledAfter.toMillis + 100)
+
+ Cluster(system).join(Cluster(system).selfAddress)
+ awaitAssert {
+ Cluster(system).selfMember.status shouldEqual MemberStatus.Up
+ }
+
+ // first check after member up will trigger start of clock
+ val response3 = check()
+ shardRegionProbe.expectMsg(ShardRegion.GetShardRegionStatus)
+ shardRegionProbe.reply(new ShardRegion.ShardRegionStatus("cat", false))
+ response3.futureValue shouldEqual false
+
+ Thread.sleep(disabledAfter.toMillis + 100)
+
+ // and now it has exceeded the disabled-after duration
+ val response4 = check()
+ shardRegionProbe.expectNoMessage()
+ response4.futureValue shouldEqual true
+ }
}
}
diff --git a/docs/src/main/paradox/typed/cluster-sharding.md
b/docs/src/main/paradox/typed/cluster-sharding.md
index 09a36001e5..7a442e1cef 100644
--- a/docs/src/main/paradox/typed/cluster-sharding.md
+++ b/docs/src/main/paradox/typed/cluster-sharding.md
@@ -728,6 +728,8 @@ Monitoring of each shard region is off by default. Add them
by defining the enti
pekko.cluster.sharding.healthcheck.names = ["counter-1", "HelloWorld"]
```
+The health check is disabled (always returns success) after a duration of
failing checks after the Cluster member is up. Otherwise, it would stall a
Kubernetes rolling update when adding a new entity type in the new version.
+
See also additional information about how to make @ref:[smooth rolling
updates](../additional/rolling-updates.md#cluster-sharding).
## Inspecting cluster sharding state
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]