This is an automated email from the ASF dual-hosted git repository.
engelen pushed a commit to branch 1.1.x
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/1.1.x by this push:
new ef4244006e Join cluster check adjusted to support akka (#1866) (#1880)
ef4244006e is described below
commit ef4244006ec67c1ba8d92ac2eba4850ea81a242c
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Jun 2 10:38:24 2025 +0100
Join cluster check adjusted to support akka (#1866) (#1880)
* update cluster initjoin check to support akka
* add test
* extra test
---
.../org/apache/pekko/cluster/ClusterDaemon.scala | 10 ++++-
.../org/apache/pekko/cluster/ConfigUtil.scala | 45 ++++++++++++++++++++--
.../org/apache/pekko/cluster/SeedNodeProcess.scala | 7 +---
.../cluster/JoinConfigCompatCheckClusterSpec.scala | 27 +++++++++++++
4 files changed, 79 insertions(+), 10 deletions(-)
diff --git
a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala
b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala
index 746c1ceace..b289f370ca 100644
--- a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala
+++ b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala
@@ -607,7 +607,15 @@ private[cluster] class ClusterCoreDaemon(publisher:
ActorRef, joinConfigCompatCh
case other => super.unhandled(other)
}
- def initJoin(joiningNodeConfig: Config): Unit = {
+ private lazy val supportsAkkaConfig: Boolean = ConfigUtil.supportsAkkaConfig(
+ context.system.settings.config)
+
+ def initJoin(inputConfig: Config): Unit = {
+ val joiningNodeConfig = if (supportsAkkaConfig &&
!inputConfig.hasPath("pekko")) {
+ ConfigUtil.changeAkkaToPekkoConfig(inputConfig)
+ } else {
+ inputConfig
+ }
val joiningNodeVersion =
if (joiningNodeConfig.hasPath("pekko.version"))
joiningNodeConfig.getString("pekko.version")
else "unknown"
diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala
b/cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala
index 96a34a7d67..62fdd0e325 100644
--- a/cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala
+++ b/cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala
@@ -23,13 +23,16 @@ import scala.annotation.nowarn
private[cluster] object ConfigUtil {
+ private val PekkoPrefix = "org.apache.pekko"
+ private val AkkaPrefix = "akka"
+
@nowarn("msg=deprecated")
def addAkkaConfig(cfg: Config, akkaVersion: String): Config = {
import org.apache.pekko.util.ccompat.JavaConverters._
val innerSet = cfg.entrySet().asScala
.filter(e => e.getKey.startsWith("pekko.") && e.getValue.valueType() !=
ConfigValueType.OBJECT)
.map { entry =>
- entry.getKey.replace("pekko", "akka") ->
adjustPackageNameIfNecessary(entry.getValue)
+ entry.getKey.replace("pekko", "akka") ->
adjustPackageNameToAkkaIfNecessary(entry.getValue)
}
var newConfig = cfg
innerSet.foreach { case (key, value) =>
@@ -38,11 +41,45 @@ private[cluster] object ConfigUtil {
newConfig.withValue("akka.version",
ConfigValueFactory.fromAnyRef(akkaVersion))
}
- private def adjustPackageNameIfNecessary(cv: ConfigValue): ConfigValue = {
+ @nowarn("msg=deprecated")
+ def changeAkkaToPekkoConfig(cfg: Config): Config = {
+ import org.apache.pekko.util.ccompat.JavaConverters._
+ val innerSet = cfg.entrySet().asScala
+ .filter(e => e.getKey.startsWith("akka.") && e.getValue.valueType() !=
ConfigValueType.OBJECT)
+ .map { entry =>
+ entry.getKey.replace("akka", "pekko") ->
adjustPackageNameToPekkoIfNecessary(entry.getValue)
+ }
+ var newConfig = cfg
+ innerSet.foreach { case (key, value) =>
+ newConfig = newConfig.withValue(key, value)
+ }
+ newConfig
+ }
+
+ def supportsAkkaConfig(cfg: Config): Boolean = {
+ cfg
+ .getStringList("pekko.remote.accept-protocol-names")
+ .contains("akka")
+ }
+
+ private def adjustPackageNameToAkkaIfNecessary(cv: ConfigValue): ConfigValue
= {
+ if (cv.valueType() == ConfigValueType.STRING) {
+ val str = cv.unwrapped().toString
+ if (str.startsWith(PekkoPrefix)) {
+ ConfigValueFactory.fromAnyRef(str.replace(PekkoPrefix, AkkaPrefix))
+ } else {
+ cv
+ }
+ } else {
+ cv
+ }
+ }
+
+ private def adjustPackageNameToPekkoIfNecessary(cv: ConfigValue):
ConfigValue = {
if (cv.valueType() == ConfigValueType.STRING) {
val str = cv.unwrapped().toString
- if (str.startsWith("org.apache.pekko")) {
- ConfigValueFactory.fromAnyRef(str.replace("org.apache.pekko", "akka"))
+ if (str.startsWith(AkkaPrefix)) {
+ ConfigValueFactory.fromAnyRef(str.replace(AkkaPrefix, PekkoPrefix))
} else {
cv
}
diff --git
a/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala
b/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala
index 67a60b8189..985875dba2 100644
--- a/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala
+++ b/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala
@@ -47,11 +47,8 @@ private[cluster] abstract class
SeedNodeProcess(joinConfigCompatChecker: JoinCon
"Note that disabling it will allow the formation of a cluster with nodes
having incompatible configuration settings. " +
"This node will be shutdown!"
- private lazy val needsAkkaConfig: Boolean = {
- context.system.settings.config
- .getStringList("pekko.remote.accept-protocol-names")
- .contains("akka")
- }
+ private lazy val needsAkkaConfig: Boolean = ConfigUtil.supportsAkkaConfig(
+ context.system.settings.config)
private lazy val akkaVersion: String = {
val cfg = context.system.settings.config
diff --git
a/cluster/src/test/scala/org/apache/pekko/cluster/JoinConfigCompatCheckClusterSpec.scala
b/cluster/src/test/scala/org/apache/pekko/cluster/JoinConfigCompatCheckClusterSpec.scala
index 76962b1492..dc45d94956 100644
---
a/cluster/src/test/scala/org/apache/pekko/cluster/JoinConfigCompatCheckClusterSpec.scala
+++
b/cluster/src/test/scala/org/apache/pekko/cluster/JoinConfigCompatCheckClusterSpec.scala
@@ -109,5 +109,32 @@ class JoinConfigCompatCheckClusterSpec extends PekkoSpec {
checkInitJoin(oldConfig, newConfig).getClass should ===(classOf[Invalid])
checkInitJoinAck(oldConfig, newConfig).getClass should
===(classOf[Invalid])
}
+
+ "be valid when equivalent downing-provider (akka/pekko mixed cluster)" in {
+ val oldConfig =
+ ConfigFactory.parseString("""
+ pekko.cluster.downing-provider-class =
"org.apache.pekko.cluster.sbr.SplitBrainResolverProvider"
+ """).withFallback(system.settings.config)
+ val newConfig =
+ ConfigFactory.parseString("""
+ akka.cluster.downing-provider-class =
"akka.cluster.sbr.SplitBrainResolverProvider"
+ akka.version = "2.6.21"
+ """)
+ checkInitJoin(oldConfig, ConfigUtil.changeAkkaToPekkoConfig(newConfig))
should ===(Valid)
+ }
+
+ "be invalid when not equivalent downing-provider (akka/pekko mixed
cluster)" in {
+ val oldConfig =
+ ConfigFactory.parseString("""
+ pekko.cluster.downing-provider-class =
"org.apache.pekko.cluster.testkit.AutoDowning"
+ """).withFallback(system.settings.config)
+ val newConfig =
+ ConfigFactory.parseString("""
+ akka.cluster.downing-provider-class =
"akka.cluster.sbr.SplitBrainResolverProvider"
+ akka.version = "2.6.21"
+ """)
+ checkInitJoin(oldConfig,
ConfigUtil.changeAkkaToPekkoConfig(newConfig)).getClass should
===(classOf[Invalid])
+ }
+
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]