This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-management.git
The following commit(s) were added to refs/heads/main by this push:
new 4f8728cc bring in akka-management 1.4.1 changes (#801)
4f8728cc is described below
commit 4f8728cc102f450eeaddcfdf5b919a015aca6e9b
Author: PJ Fanning <[email protected]>
AuthorDate: Mon May 25 15:23:39 2026 +0100
bring in akka-management 1.4.1 changes (#801)
* port: apply akka-management PRs #1150-#1155 to pekko
- PR #1150: Already applied (fix flaky RU test script)
- PR #1151: Fix AppVersionRevision docs wording in rolling-updates.md
- PR #1152: Fix cyclic initialization in ClusterBootstrap,
AppVersionRevision, PodDeletionCost
- PR #1153: Add KUBERNETES_NAMESPACE env var support in reference.conf +
docs
- PR #1155: Ensure versionPromise always completes on failure + refactor
AppVersionRevisionSpec into KubernetesApiSpec
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-management/sessions/bdef1ab5-2a1c-4728-927c-8c89aa47597a
Co-authored-by: pjfanning <[email protected]>
* format
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../src/main/resources/reference.conf | 1 +
docs/src/main/paradox/bootstrap/kubernetes-api.md | 10 +-
docs/src/main/paradox/rolling-updates.md | 36 +++-
.../cluster/bootstrap/ClusterBootstrap.scala | 2 +-
.../kubernetes/AppVersionRevision.scala | 25 +--
.../rollingupdate/kubernetes/KubernetesApi.scala | 5 +
.../rollingupdate/kubernetes/PodDeletionCost.scala | 2 +-
.../kubernetes/AppVersionRevisionSpec.scala | 202 +--------------------
...nRevisionSpec.scala => KubernetesApiSpec.scala} | 16 +-
9 files changed, 80 insertions(+), 219 deletions(-)
diff --git a/discovery-kubernetes-api/src/main/resources/reference.conf
b/discovery-kubernetes-api/src/main/resources/reference.conf
index c5934bb4..cd47a33d 100644
--- a/discovery-kubernetes-api/src/main/resources/reference.conf
+++ b/discovery-kubernetes-api/src/main/resources/reference.conf
@@ -29,6 +29,7 @@ pekko.discovery {
#
# Set this value to a specific string to override discovering the
namespace using pod-namespace-path.
pod-namespace = "<pod-namespace>"
+ pod-namespace = ${?KUBERNETES_NAMESPACE}
# Domain of the k8s cluster
pod-domain = "cluster.local"
diff --git a/docs/src/main/paradox/bootstrap/kubernetes-api.md
b/docs/src/main/paradox/bootstrap/kubernetes-api.md
index e7b61517..a5183796 100644
--- a/docs/src/main/paradox/bootstrap/kubernetes-api.md
+++ b/docs/src/main/paradox/bootstrap/kubernetes-api.md
@@ -28,7 +28,15 @@ The following configuration is required:
The lookup needs to know which namespace to look in. By default, this will be
detected by reading the namespace
from the service account secret, in
`/var/run/secrets/kubernetes.io/serviceaccount/namespace`, but can be
overridden by
-setting `pekko.discovery.kubernetes-api.pod-namespace`.
+setting `pekko.discovery.kubernetes-api.pod-namespace` or by providing
`KUBERNETES_NAMESPACE` environment variable.
+
+```yaml
+ env:
+ - name: KUBERNETES_NAMESPACE
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.namespace
+```
For more details on how to configure the Kubernetes deployment see
@ref:[recipes](recipes.md).
diff --git a/docs/src/main/paradox/rolling-updates.md
b/docs/src/main/paradox/rolling-updates.md
index e2206a27..33cce271 100644
--- a/docs/src/main/paradox/rolling-updates.md
+++ b/docs/src/main/paradox/rolling-updates.md
@@ -85,10 +85,26 @@ The following configuration is required, more details for
each and additional co
* `pekko.rollingupdate.kubernetes.pod-name`: this can be provided by setting
`KUBERNETES_POD_NAME` environment variable to `metadata.name` on the Kubernetes
container spec.
+```yaml
+ env:
+ - name: KUBERNETES_POD_NAME
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.name
+```
+
Additionally, the pod annotator needs to know which namespace the pod belongs
to. By default, this will be detected by reading the namespace
from the service account secret, in
`/var/run/secrets/kubernetes.io/serviceaccount/namespace`, but can be
overridden by
setting `pekko.rollingupdate.kubernetes.namespace` or by providing
`KUBERNETES_NAMESPACE` environment variable.
+```yaml
+ env:
+ - name: KUBERNETES_NAMESPACE
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.namespace
+```
+
##### Role based access control
@@@ warning
@@ -132,11 +148,11 @@ This RBAC example covers only the permissions needed for
this `PodDeletionCost`
## app-version from Deployment
-When using Cluster Sharding, it is
[recommended](https://pekko.apache.org/docs/pekko/current/additional/rolling-updates.html#cluster-sharding)
for rolling updates that you define an increasing `pekko.cluster.app-version`
configuration property for each roll out.
+When using Cluster Sharding, it is
[recommended](https://pekko.apache.org/docs/pekko/current/additional/rolling-updates.html#cluster-sharding)
to define an increasing `pekko.cluster.app-version` configuration property for
each roll out.
This works well unless you use `kubectl rollout undo` which deploys the
previous ReplicaSet configuration which contains the previous value for that
config.
-To fix this, you can use `AppVersionRevision` to read the current annotation
`deployment.kubernetes.io/revision` (part of the ReplicaSet) from the
Kubernetes Deployment via the Kubernetes API which always increases, also
during a rollback:
+To fix this, you can use `AppVersionRevision` to read the current annotation
`deployment.kubernetes.io/revision` (part of the ReplicaSet) from the
Kubernetes Deployment via the Kubernetes API which always increases, also
during a rollback.
### Using
@@ -166,10 +182,26 @@ The following configuration is required, more details for
each and additional co
* `pekko.rollingupdate.kubernetes.pod-name`: this can be provided by setting
`KUBERNETES_POD_NAME` environment variable to `metadata.name` on the Kubernetes
container spec.
+```yaml
+ env:
+ - name: KUBERNETES_POD_NAME
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.name
+```
+
Additionally, the pod annotator needs to know which namespace the pod belongs
to. By default, this will be detected by reading the namespace
from the service account secret, in
`/var/run/secrets/kubernetes.io/serviceaccount/namespace`, but can be
overridden by
setting `pekko.rollingupdate.kubernetes.namespace` or by providing
`KUBERNETES_NAMESPACE` environment variable.
+```yaml
+ env:
+ - name: KUBERNETES_NAMESPACE
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.namespace
+```
+
#### Role based access control
Make sure to provide access to the corresponding RBAC rules `apiGroups` and
`resources` like this:
diff --git
a/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/ClusterBootstrap.scala
b/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/ClusterBootstrap.scala
index 2e54468c..0eb7a30a 100644
---
a/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/ClusterBootstrap.scala
+++
b/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/ClusterBootstrap.scala
@@ -88,7 +88,7 @@ final class ClusterBootstrap(implicit system:
ExtendedActorSystem) extends Exten
}
try {
PekkoManagement(system).start().failed.foreach(autostartFailed)
- ClusterBootstrap(system).start()
+ start()
} catch {
case NonFatal(ex) => autostartFailed(ex)
}
diff --git
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/AppVersionRevision.scala
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/AppVersionRevision.scala
index 2699b31f..2ddfb829 100644
---
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/AppVersionRevision.scala
+++
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/AppVersionRevision.scala
@@ -44,21 +44,24 @@ final class AppVersionRevision(implicit system:
ExtendedActorSystem) extends Ext
def getRevision(): Future[Version] = versionPromise.future
def start(): Unit = {
- if (k8sSettings.podName.isEmpty) {
- log.error(
- "Not able to read the app version from the revision of the current
ReplicaSet. Reason: " +
- "No configuration found to extract the pod name from. " +
- s"Be sure to provide the pod name with `$configPath.pod-name` " +
- "or by setting ENV variable `KUBERNETES_POD_NAME`.")
- } else {
- if (isInitialized.compareAndSet(false, true)) {
+ if (isInitialized.compareAndSet(false, true)) {
+ if (k8sSettings.podName.isEmpty) {
+ val msg = "Not able to read the app version from the revision of the
current ReplicaSet. Reason:" +
+ "No configuration found to extract the pod name from. " +
+ s"Be sure to provide the pod name with `$configPath.pod-name` " +
+ "or by setting ENV variable `KUBERNETES_POD_NAME`."
+ log.error(msg)
+ versionPromise.failure(new MissingPodNameException(msg))
+ } else {
Cluster(system).setAppVersionLater(getRevision())
KubernetesApiImpl(log, k8sSettings).foreach { kubernetesApi =>
versionPromise.completeWith(kubernetesApi.readRevision().map(Version(_)))
}
- } else
- log.warning("AppVersionRevision extension already initiated, yet
start() method was called again. Ignoring.")
+ }
+ } else {
+ log.warning("AppVersionRevision extension already initiated, yet start()
method was called again. Ignoring.")
}
+
}
// autostart if the extension is loaded through the config extension list
@@ -68,7 +71,7 @@ final class AppVersionRevision(implicit system:
ExtendedActorSystem) extends Ext
if (autostart) {
log.info("AppVersionRevision loaded through 'pekko.extensions'
auto-starting itself.")
try {
- AppVersionRevision(system).start()
+ start()
} catch {
case NonFatal(ex) =>
log.error(ex, "Failed to autostart AppVersionRevision extension")
diff --git
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApi.scala
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApi.scala
index 621340f2..7818c14d 100644
---
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApi.scala
+++
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApi.scala
@@ -59,6 +59,11 @@ private[pekko] final case class PodCost(podName: String,
cost: Int, address: Str
*/
@InternalApi private[pekko] sealed class ReadRevisionException(message:
String) extends RuntimeException(message)
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] sealed class MissingPodNameException(message:
String) extends RuntimeException(message)
+
/**
* INTERNAL API
*/
diff --git
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCost.scala
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCost.scala
index 4e244a48..28990d30 100644
---
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCost.scala
+++
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCost.scala
@@ -78,7 +78,7 @@ final class PodDeletionCost(implicit system:
ExtendedActorSystem) extends Extens
if (autostart) {
log.info("PodDeletionCost loaded through 'pekko.extensions' auto-starting
itself.")
try {
- PodDeletionCost(system).start()
+ start()
} catch {
case NonFatal(ex) =>
log.error(ex, "Failed to autostart PodDeletionCost extension")
diff --git
a/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/AppVersionRevisionSpec.scala
b/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/AppVersionRevisionSpec.scala
index 39a7a95e..f5c81bfe 100644
---
a/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/AppVersionRevisionSpec.scala
+++
b/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/AppVersionRevisionSpec.scala
@@ -13,230 +13,42 @@
package org.apache.pekko.rollingupdate.kubernetes
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.github.tomakehurst.wiremock.WireMockServer
-import com.github.tomakehurst.wiremock.client.MappingBuilder
-import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder
-import com.github.tomakehurst.wiremock.client.WireMock
-import com.github.tomakehurst.wiremock.client.WireMock.aResponse
-import com.github.tomakehurst.wiremock.client.WireMock.get
-import com.github.tomakehurst.wiremock.client.WireMock.stubFor
-import com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo
-import
com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig
-import com.github.tomakehurst.wiremock.matching.EqualToPattern
-import com.github.tomakehurst.wiremock.stubbing.Scenario
import com.typesafe.config.ConfigFactory
import org.apache.pekko
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.BeforeAndAfterEach
-import org.scalatest.concurrent.Eventually
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
-import org.scalatest.time.Millis
-import org.scalatest.time.Seconds
-import org.scalatest.time.Span
import org.scalatest.wordspec.AnyWordSpecLike
import pekko.actor.ActorSystem
-import pekko.testkit.EventFilter
-import pekko.testkit.ImplicitSender
import pekko.testkit.TestKit
-import scala.concurrent.duration._
-
object AppVersionRevisionSpec {
val config = ConfigFactory.parseString("""
- pekko.loggers = ["org.apache.pekko.testkit.TestEventListener"]
pekko.actor.provider = cluster
pekko.remote.artery.canonical.port = 0
pekko.remote.artery.canonical.hostname = 127.0.0.1
- pekko.cluster.jmx.multi-mbeans-in-same-jvm = on
pekko.coordinated-shutdown.terminate-actor-system = off
pekko.coordinated-shutdown.run-by-actor-system-terminate = off
- pekko.test.filter-leeway = 10s
+ pekko.rollingupdate.kubernetes.pod-name = ""
""")
}
-
class AppVersionRevisionSpec
extends TestKit(
ActorSystem(
"AppVersionRevisionSpec",
AppVersionRevisionSpec.config
))
- with ImplicitSender
with AnyWordSpecLike
with Matchers
- with BeforeAndAfterAll
- with BeforeAndAfterEach
- with Eventually
with ScalaFutures {
- private val wireMockServer = new WireMockServer(wireMockConfig().port(0))
- wireMockServer.start()
- WireMock.configureFor(wireMockServer.port())
-
- // for wiremock to provide json
- val mapper = new ObjectMapper()
-
- private val namespace = "namespace-test"
- private val podName1 = "pod-test-1"
-
- private def settings(podName: String) = {
- new KubernetesSettings(
- apiCaPath = "",
- apiTokenPath = "",
- apiServiceHost = "localhost",
- apiServicePort = wireMockServer.port(),
- namespace = Some(namespace),
- namespacePath = "",
- podName = podName,
- secure = false,
- apiServiceRequestTimeout = 2.seconds,
- customResourceSettings = new CustomResourceSettings(enabled = false,
crName = None, 60.seconds)
- )
- }
-
- private val kubernetesApi =
- new KubernetesApiImpl(
- system,
- settings(podName1),
- namespace,
- apiToken = "apiToken",
- clientHttpsConnectionContext = None)
-
- override implicit val patienceConfig: PatienceConfig =
- PatienceConfig(timeout = Span(5, Seconds), interval = Span(100, Millis))
-
- override protected def afterAll(): Unit = super.shutdown()
-
- override protected def beforeEach(): Unit = {
- wireMockServer.resetAll()
- WireMock.resetAllScenarios()
- }
-
- private def podPath(podName: String) =
- urlEqualTo(s"/api/v1/namespaces/$namespace/pods/$podName")
-
- private def replicaPath(replica: String) =
- urlEqualTo(s"/apis/apps/v1/namespaces/$namespace/replicasets/$replica")
-
- private def getPod(podName: String): MappingBuilder =
- get(podPath(podName)).withHeader("Content-Type", new
EqualToPattern("application/json"))
-
- private def getReplicaSet(replica: String): MappingBuilder =
- get(replicaPath(replica)).withHeader("Content-Type", new
EqualToPattern("application/json"))
-
- private val defaultPodResponseJson =
- """{
- | "metadata": {
- | "ownerReferences": [
- | {"name": "wrong-replicaset-id", "kind": "SomethingElse"},
- | {"name": "parent-replicaset-id", "kind": "ReplicaSet"}
- | ]
- | }
- |}""".stripMargin
-
- private val defaultReplicaResponseJson =
- """{
- | "metadata": {
- | "annotations": {
- | "deployment.kubernetes.io/revision": "1"
- | }
- | }
- |}""".stripMargin
-
- private def stubPodResponse(json: String = defaultPodResponseJson, state:
String = Scenario.STARTED) =
- stubFor(
- getPod(podName1)
- .willReturn(
-
ResponseDefinitionBuilder.okForJson("").withJsonBody(mapper.readTree(json))
- )
- .inScenario("pod")
- .whenScenarioStateIs(state))
-
- private def stubReplicaResponse(json: String = defaultReplicaResponseJson) =
- stubFor(
- getReplicaSet("parent-replicaset-id")
- .willReturn(
-
ResponseDefinitionBuilder.okForJson("").withJsonBody(mapper.readTree(json))
- )
- .inScenario("replica")
- .whenScenarioStateIs(Scenario.STARTED))
-
- "Read revision from Kubernetes" should {
-
- "parse pod and replica response to get the revision" in {
- stubPodResponse()
- stubReplicaResponse()
-
- EventFilter
- .info(pattern = "Reading revision from Kubernetes:
pekko.cluster.app-version was set to 1", occurrences = 1)
- .intercept {
- kubernetesApi.readRevision().futureValue should be("1")
- }
- }
-
- "retry and then fail when pod not found" in {
- stubFor(getPod(podName1).willReturn(aResponse().withStatus(404)))
- EventFilter
- .warning(pattern = ".*Failed to get revision", occurrences = 5)
- .intercept {
-
assert(kubernetesApi.readRevision().failed.futureValue.isInstanceOf[ReadRevisionException])
- }
- }
-
- "retry and then fail when replicaset not found" in {
- stubPodResponse()
-
stubFor(getReplicaSet("parent-replicaset-id").willReturn(aResponse().withStatus(404)))
- EventFilter
- .warning(pattern = ".*Failed to get revision", occurrences = 5)
- .intercept {
-
assert(kubernetesApi.readRevision().failed.futureValue.isInstanceOf[ReadRevisionException])
- }
- }
-
- "log if pod json can not be parsed" in {
- stubPodResponse(json = """{ "invalid": "json" }""")
- EventFilter
- .warning(pattern = ".*Error while parsing Pod*")
- .intercept {
-
assert(kubernetesApi.readRevision().failed.futureValue.isInstanceOf[ReadRevisionException])
- }
- }
-
- "log if replica json can not be parsed" in {
- stubPodResponse()
- stubReplicaResponse(json = """{ "invalid": "json" }""")
- EventFilter
- .warning(pattern = ".*Error while parsing Pod*")
- .intercept {
-
assert(kubernetesApi.readRevision().failed.futureValue.isInstanceOf[ReadRevisionException])
- }
- }
-
- "break the loop if consecutive request succeeds" in {
- stubFor(
- getPod(podName1)
- .willReturn(aResponse().withStatus(404))
- .inScenario("pod")
- .whenScenarioStateIs(Scenario.STARTED)
- .willSetStateTo("after first fail")
- )
- stubFor(
- getPod(podName1)
- .willReturn(aResponse().withStatus(404))
- .inScenario("pod")
- .whenScenarioStateIs("after first fail")
- .willSetStateTo("k8s is happy now")
- )
- stubPodResponse(state = "k8s is happy now")
- stubReplicaResponse()
- EventFilter
- .warning(pattern = ".*Try again*", occurrences = 2)
- .intercept {
- kubernetesApi.readRevision().futureValue should be("1")
- }
+ "AppVersionRevision extension" should {
+ "return failed future if pod-name is not configured" in {
+ val revisionExtension = AppVersionRevision(system)
+ revisionExtension.start()
+ val failure = revisionExtension.getRevision().failed.futureValue
+ failure.getMessage should include("No configuration found to extract the
pod name from")
}
}
}
diff --git
a/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/AppVersionRevisionSpec.scala
b/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApiSpec.scala
similarity index 98%
copy from
rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/AppVersionRevisionSpec.scala
copy to
rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApiSpec.scala
index 39a7a95e..18dcd201 100644
---
a/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/AppVersionRevisionSpec.scala
+++
b/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApiSpec.scala
@@ -27,6 +27,10 @@ import
com.github.tomakehurst.wiremock.matching.EqualToPattern
import com.github.tomakehurst.wiremock.stubbing.Scenario
import com.typesafe.config.ConfigFactory
import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.testkit.EventFilter
+import pekko.testkit.ImplicitSender
+import pekko.testkit.TestKit
import org.scalatest.BeforeAndAfterAll
import org.scalatest.BeforeAndAfterEach
import org.scalatest.concurrent.Eventually
@@ -36,14 +40,10 @@ import org.scalatest.time.Millis
import org.scalatest.time.Seconds
import org.scalatest.time.Span
import org.scalatest.wordspec.AnyWordSpecLike
-import pekko.actor.ActorSystem
-import pekko.testkit.EventFilter
-import pekko.testkit.ImplicitSender
-import pekko.testkit.TestKit
import scala.concurrent.duration._
-object AppVersionRevisionSpec {
+object KubernetesApiSpec {
val config = ConfigFactory.parseString("""
pekko.loggers = ["org.apache.pekko.testkit.TestEventListener"]
pekko.actor.provider = cluster
@@ -58,11 +58,11 @@ object AppVersionRevisionSpec {
""")
}
-class AppVersionRevisionSpec
+class KubernetesApiSpec
extends TestKit(
ActorSystem(
- "AppVersionRevisionSpec",
- AppVersionRevisionSpec.config
+ "KubernetesApiSpec",
+ KubernetesApiSpec.config
))
with ImplicitSender
with AnyWordSpecLike
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]