This is an automated email from the ASF dual-hosted git repository.

engelen pushed a commit to branch 1.1.x
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/1.1.x by this push:
     new ef4244006e Join cluster check adjusted to support akka (#1866) (#1880)
ef4244006e is described below

commit ef4244006ec67c1ba8d92ac2eba4850ea81a242c
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Jun 2 10:38:24 2025 +0100

    Join cluster check adjusted to support akka (#1866) (#1880)
    
    * update cluster initjoin check to support akka
    
    * add test
    
    * extra test
---
 .../org/apache/pekko/cluster/ClusterDaemon.scala   | 10 ++++-
 .../org/apache/pekko/cluster/ConfigUtil.scala      | 45 ++++++++++++++++++++--
 .../org/apache/pekko/cluster/SeedNodeProcess.scala |  7 +---
 .../cluster/JoinConfigCompatCheckClusterSpec.scala | 27 +++++++++++++
 4 files changed, 79 insertions(+), 10 deletions(-)

diff --git 
a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala 
b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala
index 746c1ceace..b289f370ca 100644
--- a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala
+++ b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala
@@ -607,7 +607,15 @@ private[cluster] class ClusterCoreDaemon(publisher: 
ActorRef, joinConfigCompatCh
     case other               => super.unhandled(other)
   }
 
-  def initJoin(joiningNodeConfig: Config): Unit = {
+  private lazy val supportsAkkaConfig: Boolean = ConfigUtil.supportsAkkaConfig(
+    context.system.settings.config)
+
+  def initJoin(inputConfig: Config): Unit = {
+    val joiningNodeConfig = if (supportsAkkaConfig && 
!inputConfig.hasPath("pekko")) {
+      ConfigUtil.changeAkkaToPekkoConfig(inputConfig)
+    } else {
+      inputConfig
+    }
     val joiningNodeVersion =
       if (joiningNodeConfig.hasPath("pekko.version")) 
joiningNodeConfig.getString("pekko.version")
       else "unknown"
diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala 
b/cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala
index 96a34a7d67..62fdd0e325 100644
--- a/cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala
+++ b/cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala
@@ -23,13 +23,16 @@ import scala.annotation.nowarn
 
 private[cluster] object ConfigUtil {
 
+  private val PekkoPrefix = "org.apache.pekko"
+  private val AkkaPrefix = "akka"
+
   @nowarn("msg=deprecated")
   def addAkkaConfig(cfg: Config, akkaVersion: String): Config = {
     import org.apache.pekko.util.ccompat.JavaConverters._
     val innerSet = cfg.entrySet().asScala
       .filter(e => e.getKey.startsWith("pekko.") && e.getValue.valueType() != 
ConfigValueType.OBJECT)
       .map { entry =>
-        entry.getKey.replace("pekko", "akka") -> 
adjustPackageNameIfNecessary(entry.getValue)
+        entry.getKey.replace("pekko", "akka") -> 
adjustPackageNameToAkkaIfNecessary(entry.getValue)
       }
     var newConfig = cfg
     innerSet.foreach { case (key, value) =>
@@ -38,11 +41,45 @@ private[cluster] object ConfigUtil {
     newConfig.withValue("akka.version", 
ConfigValueFactory.fromAnyRef(akkaVersion))
   }
 
-  private def adjustPackageNameIfNecessary(cv: ConfigValue): ConfigValue = {
+  @nowarn("msg=deprecated")
+  def changeAkkaToPekkoConfig(cfg: Config): Config = {
+    import org.apache.pekko.util.ccompat.JavaConverters._
+    val innerSet = cfg.entrySet().asScala
+      .filter(e => e.getKey.startsWith("akka.") && e.getValue.valueType() != 
ConfigValueType.OBJECT)
+      .map { entry =>
+        entry.getKey.replace("akka", "pekko") -> 
adjustPackageNameToPekkoIfNecessary(entry.getValue)
+      }
+    var newConfig = cfg
+    innerSet.foreach { case (key, value) =>
+      newConfig = newConfig.withValue(key, value)
+    }
+    newConfig
+  }
+
+  def supportsAkkaConfig(cfg: Config): Boolean = {
+    cfg
+      .getStringList("pekko.remote.accept-protocol-names")
+      .contains("akka")
+  }
+
+  private def adjustPackageNameToAkkaIfNecessary(cv: ConfigValue): ConfigValue 
= {
+    if (cv.valueType() == ConfigValueType.STRING) {
+      val str = cv.unwrapped().toString
+      if (str.startsWith(PekkoPrefix)) {
+        ConfigValueFactory.fromAnyRef(str.replace(PekkoPrefix, AkkaPrefix))
+      } else {
+        cv
+      }
+    } else {
+      cv
+    }
+  }
+
+  private def adjustPackageNameToPekkoIfNecessary(cv: ConfigValue): 
ConfigValue = {
     if (cv.valueType() == ConfigValueType.STRING) {
       val str = cv.unwrapped().toString
-      if (str.startsWith("org.apache.pekko")) {
-        ConfigValueFactory.fromAnyRef(str.replace("org.apache.pekko", "akka"))
+      if (str.startsWith(AkkaPrefix)) {
+        ConfigValueFactory.fromAnyRef(str.replace(AkkaPrefix, PekkoPrefix))
       } else {
         cv
       }
diff --git 
a/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala 
b/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala
index 67a60b8189..985875dba2 100644
--- a/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala
+++ b/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala
@@ -47,11 +47,8 @@ private[cluster] abstract class 
SeedNodeProcess(joinConfigCompatChecker: JoinCon
     "Note that disabling it will allow the formation of a cluster with nodes 
having incompatible configuration settings. " +
     "This node will be shutdown!"
 
-  private lazy val needsAkkaConfig: Boolean = {
-    context.system.settings.config
-      .getStringList("pekko.remote.accept-protocol-names")
-      .contains("akka")
-  }
+  private lazy val needsAkkaConfig: Boolean = ConfigUtil.supportsAkkaConfig(
+    context.system.settings.config)
 
   private lazy val akkaVersion: String = {
     val cfg = context.system.settings.config
diff --git 
a/cluster/src/test/scala/org/apache/pekko/cluster/JoinConfigCompatCheckClusterSpec.scala
 
b/cluster/src/test/scala/org/apache/pekko/cluster/JoinConfigCompatCheckClusterSpec.scala
index 76962b1492..dc45d94956 100644
--- 
a/cluster/src/test/scala/org/apache/pekko/cluster/JoinConfigCompatCheckClusterSpec.scala
+++ 
b/cluster/src/test/scala/org/apache/pekko/cluster/JoinConfigCompatCheckClusterSpec.scala
@@ -109,5 +109,32 @@ class JoinConfigCompatCheckClusterSpec extends PekkoSpec {
       checkInitJoin(oldConfig, newConfig).getClass should ===(classOf[Invalid])
       checkInitJoinAck(oldConfig, newConfig).getClass should 
===(classOf[Invalid])
     }
+
+    "be valid when equivalent downing-provider (akka/pekko mixed cluster)" in {
+      val oldConfig =
+        ConfigFactory.parseString("""
+        pekko.cluster.downing-provider-class = 
"org.apache.pekko.cluster.sbr.SplitBrainResolverProvider"
+        """).withFallback(system.settings.config)
+      val newConfig =
+        ConfigFactory.parseString("""
+        akka.cluster.downing-provider-class = 
"akka.cluster.sbr.SplitBrainResolverProvider"
+        akka.version = "2.6.21"
+        """)
+      checkInitJoin(oldConfig, ConfigUtil.changeAkkaToPekkoConfig(newConfig)) 
should ===(Valid)
+    }
+
+    "be invalid when not equivalent downing-provider (akka/pekko mixed 
cluster)" in {
+      val oldConfig =
+        ConfigFactory.parseString("""
+        pekko.cluster.downing-provider-class = 
"org.apache.pekko.cluster.testkit.AutoDowning"
+        """).withFallback(system.settings.config)
+      val newConfig =
+        ConfigFactory.parseString("""
+        akka.cluster.downing-provider-class = 
"akka.cluster.sbr.SplitBrainResolverProvider"
+        akka.version = "2.6.21"
+        """)
+      checkInitJoin(oldConfig, 
ConfigUtil.changeAkkaToPekkoConfig(newConfig)).getClass should 
===(classOf[Invalid])
+    }
+
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to