This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-management.git
The following commit(s) were added to refs/heads/main by this push:
new 9917071c feat: port rolling-update-kubernetes module from
akka-management PRs #1113 and #1123 (#732)
9917071c is described below
commit 9917071c3bd8ab28f54ff33dacfc047d95fd1673
Author: PJ Fanning <[email protected]>
AuthorDate: Sun Mar 29 13:02:52 2026 +0200
feat: port rolling-update-kubernetes module from akka-management PRs #1113
and #1123 (#732)
* Initial plan
* feat: add rolling-update-kubernetes module porting PRs #1113 and #1123
from akka-management
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-management/sessions/dd6749fb-f991-4e29-9ce0-06660ca86a32
Co-authored-by: pjfanning <[email protected]>
* scalafmt
* javafmt
* compile issue
* Update build.sbt
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
build.sbt | 41 +--
docs/src/main/paradox/bootstrap/recipes.md | 4 +-
docs/src/main/paradox/index.md | 4 +-
.../kubernetes-deployment/forming-a-cluster.md | 5 +
docs/src/main/paradox/rolling-updates.md | 131 ++++++++++
project/Dependencies.scala | 9 +
project/project-info.conf | 3 +
.../src/main/resources/reference.conf | 49 ++++
.../apache/pekko/rollingupdate/CostStrategy.scala | 43 ++++
.../rollingupdate/kubernetes/ApiRequests.scala | 51 ++++
.../kubernetes/KubernetesSettings.scala | 61 +++++
.../rollingupdate/kubernetes/PodDeletionCost.scala | 124 +++++++++
.../kubernetes/PodDeletionCostAnnotator.scala | 228 +++++++++++++++++
.../kubernetes/PodDeletionCostCompileOnly.java | 29 +++
.../kubernetes/PodDeletionCostJavaCompileTest.java | 28 +++
.../kubernetes/PodDeletionCostCompileOnly.scala | 28 +++
.../kubernetes/PodDeletionCostAnnotatorSpec.scala | 280 +++++++++++++++++++++
17 files changed, 1099 insertions(+), 19 deletions(-)
diff --git a/build.sbt b/build.sbt
index 1e97a536..ff1066df 100644
--- a/build.sbt
+++ b/build.sbt
@@ -51,7 +51,8 @@ val userProjects: Seq[ProjectReference] =
Seq[ProjectReference](
management,
managementPki,
managementClusterHttp,
- managementClusterBootstrap) ++ logLevelProjectList
+ managementClusterBootstrap,
+ rollingUpdateKubernetes) ++ logLevelProjectList
val projectList: Seq[ProjectReference] =
userProjects ++ Seq[ProjectReference](
@@ -114,11 +115,7 @@ lazy val management = pekkoModule("management")
.enablePlugins(AutomateHeaderPlugin, ReproducibleBuildsPlugin)
.settings(
name := "pekko-management",
- libraryDependencies := Dependencies.managementHttp ++ Seq(
- "com.github.sbt.junit" % "jupiter-interface" %
JupiterKeys.jupiterVersion.value % Test,
- "org.junit.jupiter" % "junit-jupiter-api" %
JupiterKeys.junitJupiterVersion.value % Test,
- "org.junit.jupiter" % "junit-jupiter-engine" %
JupiterKeys.junitJupiterVersion.value % Test,
- "org.junit.platform" % "junit-platform-launcher" %
JupiterKeys.junitPlatformVersion.value % Test),
+ libraryDependencies := Dependencies.managementHttp ++
junitDependencies.value,
mimaPreviousArtifactsSet)
lazy val managementPki = pekkoModule("management-pki")
@@ -148,11 +145,7 @@ lazy val managementClusterHttp =
pekkoModule("management-cluster-http")
.enablePlugins(AutomateHeaderPlugin, ReproducibleBuildsPlugin)
.settings(
name := "pekko-management-cluster-http",
- libraryDependencies := Dependencies.managementClusterHttp ++ Seq(
- "com.github.sbt.junit" % "jupiter-interface" %
JupiterKeys.jupiterVersion.value % Test,
- "org.junit.jupiter" % "junit-jupiter-api" %
JupiterKeys.junitJupiterVersion.value % Test,
- "org.junit.jupiter" % "junit-jupiter-engine" %
JupiterKeys.junitJupiterVersion.value % Test,
- "org.junit.platform" % "junit-platform-launcher" %
JupiterKeys.junitPlatformVersion.value % Test),
+ libraryDependencies := Dependencies.managementClusterHttp ++
junitDependencies.value,
// following is needed by Agrona lib
// https://github.com/aeron-io/agrona/wiki/Change-Log#200-2024-12-17
Test / fork := true,
@@ -164,11 +157,7 @@ lazy val managementClusterBootstrap =
pekkoModule("management-cluster-bootstrap"
.enablePlugins(AutomateHeaderPlugin, ReproducibleBuildsPlugin)
.settings(
name := "pekko-management-cluster-bootstrap",
- libraryDependencies := Dependencies.managementClusterBootstrap ++ Seq(
- "com.github.sbt.junit" % "jupiter-interface" %
JupiterKeys.jupiterVersion.value % Test,
- "org.junit.jupiter" % "junit-jupiter-api" %
JupiterKeys.junitJupiterVersion.value % Test,
- "org.junit.jupiter" % "junit-jupiter-engine" %
JupiterKeys.junitJupiterVersion.value % Test,
- "org.junit.platform" % "junit-platform-launcher" %
JupiterKeys.junitPlatformVersion.value % Test),
+ libraryDependencies := Dependencies.managementClusterBootstrap ++
junitDependencies.value,
// following is needed by Agrona lib
// https://github.com/aeron-io/agrona/wiki/Change-Log#200-2024-12-17
Test / fork := true,
@@ -185,6 +174,18 @@ lazy val leaseKubernetes = pekkoModule("lease-kubernetes")
mimaPreviousArtifactsSet)
.dependsOn(managementPki)
+lazy val rollingUpdateKubernetes = pekkoModule("rolling-update-kubernetes")
+ .enablePlugins(AutomateHeaderPlugin, ReproducibleBuildsPlugin)
+ .settings(
+ name := "pekko-rolling-update-kubernetes",
+ libraryDependencies := Dependencies.rollingUpdateKubernetes ++
junitDependencies.value,
+ // following is needed by Agrona lib
+ // https://github.com/aeron-io/agrona/wiki/Change-Log#200-2024-12-17
+ Test / fork := true,
+ Test / javaOptions +=
"--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED",
+ mimaPreviousArtifacts := Set.empty)
+ .dependsOn(managementPki)
+
lazy val billOfMaterials = Project("bill-of-materials",
file("bill-of-materials"))
.enablePlugins(BillOfMaterialsPlugin)
.disablePlugins(MimaPlugin)
@@ -333,6 +334,14 @@ lazy val docs = project
}
}.taskValue)
+lazy val junitDependencies = Def.setting {
+ Seq(
+ "com.github.sbt.junit" % "jupiter-interface" %
JupiterKeys.jupiterVersion.value % Test,
+ "org.junit.jupiter" % "junit-jupiter-api" %
JupiterKeys.junitJupiterVersion.value % Test,
+ "org.junit.jupiter" % "junit-jupiter-engine" %
JupiterKeys.junitJupiterVersion.value % Test,
+ "org.junit.platform" % "junit-platform-launcher" %
JupiterKeys.junitPlatformVersion.value % Test)
+}
+
def pekkoModule(moduleName: String): Project =
Project(id = moduleName, base = file(moduleName))
diff --git a/docs/src/main/paradox/bootstrap/recipes.md
b/docs/src/main/paradox/bootstrap/recipes.md
index 59d156e9..84cd0977 100644
--- a/docs/src/main/paradox/bootstrap/recipes.md
+++ b/docs/src/main/paradox/bootstrap/recipes.md
@@ -37,8 +37,8 @@ If using @extref:[Split Brain
Resolver](pekko:split-brain-resolver.html) have a
### Cluster singletons
-Deployments order pods by pod state and then time spent ready when deciding
which to remove first. This works well
-with cluster singletons as they are typically removed last and then the
cluster singletons move to the the oldest new pod.
+Deployments used to order pods by pod state and then time spent ready when
deciding which to remove first. This worked well
+with cluster singletons as they were typically removed last and then the
cluster singletons would move to the oldest new pod. However, since Kubernetes
v1.22, this is no longer the default behaviour for Kubernetes deployments thus
we advise the use of `PodDeletionCost` extension from @ref:[Pekko Kubernetes
Rolling Update](../rolling-updates.md#kubernetes-rolling-updates).
### External traffic
diff --git a/docs/src/main/paradox/index.md b/docs/src/main/paradox/index.md
index 824ee23e..80ae56a5 100644
--- a/docs/src/main/paradox/index.md
+++ b/docs/src/main/paradox/index.md
@@ -10,7 +10,8 @@ Pekko Management consists of multiple modules:
* @ref[pekko-cluster-bootstrap](bootstrap/index.md) helps bootstrapping a
Pekko cluster using @extref:[Pekko Discovery](pekko:discovery/index.html).
* @ref[pekko-management-cluster-http](cluster-http-management.md) is a module
that provides HTTP endpoints for introspecting and managing Pekko clusters.
* @ref[kubernetes-lease](kubernetes-lease.md) a lease backed by Kubernetes
API server.
-
+ * @ref[rolling-updates](rolling-updates.md) provides recommendations and
extensions for having smoother rolling updates.
+
As well as @extref:[Pekko Discovery](pekko:discovery/index.html) methods for:
* @ref[Kubernetes API](discovery/kubernetes.md)
@@ -36,4 +37,5 @@ Various parts of Pekko management can be used together for
deploying Pekko Clust
- [Pekko Cluster Management (JMX)](cluster-jmx-management.md)
- [Dynamic Log Levels](loglevels/index.md)
- [Pekko Coordination Lease for Kubernetes](kubernetes-lease.md)
+ - [Rolling Updates](rolling-updates.md)
@@@
diff --git a/docs/src/main/paradox/kubernetes-deployment/forming-a-cluster.md
b/docs/src/main/paradox/kubernetes-deployment/forming-a-cluster.md
index afeba824..438e1f05 100644
--- a/docs/src/main/paradox/kubernetes-deployment/forming-a-cluster.md
+++ b/docs/src/main/paradox/kubernetes-deployment/forming-a-cluster.md
@@ -195,3 +195,8 @@ livenessProbe:
failureThreshold: 10
initialDelaySeconds: 20
```
+
+
+## Rolling Updates
+
+Starting from Kubernetes v1.22, ReplicaSets are not scaled down with the
youngest node first which can cause problems for the Pekko cluster. A new Pekko
extension was developed to address this issue, and you can find the
documentation under @ref:[Kubernetes Rolling
Updates](../rolling-updates.md#kubernetes-rolling-updates) section.
diff --git a/docs/src/main/paradox/rolling-updates.md
b/docs/src/main/paradox/rolling-updates.md
new file mode 100644
index 00000000..84195c9d
--- /dev/null
+++ b/docs/src/main/paradox/rolling-updates.md
@@ -0,0 +1,131 @@
+# Rolling Updates
+
+Rolling updates allow you to update an application by gradually replacing old
nodes with new ones. This ensures that the application remains available
throughout the update process, with minimal disruption to clients.
+
+#### Graceful shutdown
+
+Pekko Cluster can handle hard failures using a downing provider such as
Lightbend's @extref:[Split Brain Resolver](pekko:split-brain-resolver.html).
+However, this should not be relied upon for regular rolling redeploys.
Features such as `ClusterSingleton`s and `ClusterSharding`
+can safely restart actors on new nodes far quicker when it is certain that a
node has shutdown rather than crashed.
+
+Graceful leaving will happen with the default settings as it is part of
@extref:[Coordinated Shutdown](pekko:actors.html#coordinated-shutdown).
+Just ensure that a node is sent a `SIGTERM` and not a `SIGKILL`. Environments
such as Kubernetes will do this, it is important to ensure
+that if JVM is wrapped with a script that it forwards the signal.
+
+Upon receiving a `SIGTERM` Coordinated Shutdown will:
+
+* Perform a `Cluster(system).leave` on itself
+* The status of the member will be changed to Exiting while allowing any
shards to be shutdown gracefully and
+ `ClusterSingleton`s to be migrated if this was the oldest node. Finally, the
node is removed from the Pekko Cluster membership.
+
+
+#### Number of nodes to redeploy at once
+
+Pekko bootstrap requires a `stable-period` where service discovery returns a
stable set of contact points. When doing rolling
+updates it is best to wait for a node (or group of nodes) to finish joining
the cluster before adding and removing other nodes.
+
+#### Cluster Singletons
+
+`ClusterSingleton`s run on the oldest node in the cluster. To avoid singletons
moving during every node deployment it is advised
+to start a rolling redeploy starting at the newest node. Then
`ClusterSingleton`s only move once. Cluster Sharding uses a singleton
internally so this is important even if not using singletons directly.
+
+
+## Kubernetes Rolling Updates
+
+Starting from Kubernetes v1.22, ReplicaSets are not scaled down with the
youngest node first (see details
[here](https://github.com/kubernetes/enhancements/tree/master/keps/sig-apps/2185-random-pod-select-on-replicaset-downscale)).
That is because after some time all nodes that were brought up in the same
time bucket are treated as equally old and the node to scale down first is
chosen randomly.
+
+As mentioned previously, the oldest node in a Pekko cluster has a special role
as it hosts singletons. If the oldest node in a cluster changes frequently,
singletons need to be moved around as well which can have undesired
consequences.
+
+This module provides the Pod Deletion Cost extension which automatically
annotates older pods so that they are selected last when removing nodes,
providing for better overall stability for the cluster operations.
+
+### Project Info
+
+@@project-info{ projectId="rolling-update-kubernetes" }
+
+
+### Dependency
+
+Add `pekko-rolling-update-kubernetes` to your dependency management tool:
+
+@@dependency[sbt,Gradle,Maven] {
+symbol=PekkoManagementVersion
+value=$project.version$
+group=org.apache.pekko
+artifact=pekko-rolling-update-kubernetes_$scala.binary.version$
+version=PekkoManagementVersion
+}
+
+
+### Using
+
+Pekko Pod Deletion Cost extension must be started, this can either be done
through config or programmatically.
+
+**Through config**
+
+Listing the `PodDeletionCost` extension among the autoloaded
`pekko.extensions` in `application.conf` will also cause it to autostart:
+
+```
+pekko.extensions =
["org.apache.pekko.rollingupdate.kubernetes.PodDeletionCost"]
+```
+
+If management or bootstrap configuration is incorrect, the autostart will log
an error and terminate the actor system.
+
+**Programmatically**
+
+Scala
+: @@snip
[PodDeletionCostCompileOnly.scala](/rolling-update-kubernetes/src/test/scala/doc/pekko/rollingupdate/kubernetes/PodDeletionCostCompileOnly.scala)
{ #start }
+
+Java
+: @@snip
[PodDeletionCostCompileOnly.java](/rolling-update-kubernetes/src/test/java/jdoc/pekko/rollingupdate/kubernetes/PodDeletionCostCompileOnly.java)
{ #start }
+
+
+#### Configuration
+
+The following configuration is required, more details for each and additional
configurations can be found in
[reference.conf](https://github.com/apache/pekko-management/blob/main/rolling-update-kubernetes/src/main/resources/reference.conf):
+
+* `pekko.rollingupdate.kubernetes.pod-name`: this can be provided by setting
`KUBERNETES_POD_NAME` environment variable to `metadata.name` on the Kubernetes
container spec.
+
+Additionally, the pod annotator needs to know which namespace the pod belongs
to. By default, this will be detected by reading the namespace
+from the service account secret, in
`/var/run/secrets/kubernetes.io/serviceaccount/namespace`, but can be
overridden by
+setting `pekko.rollingupdate.kubernetes.namespace` or by providing
`KUBERNETES_NAMESPACE` environment variable.
+
+##### Role based access control
+
+@@@ warning
+
+This extension uses the Kubernetes API to set the `pod-deletion-cost`
annotation on its own pod. To be able to do that, it requires special
permission to be able to `patch` the pod configuration. Each pod only needs
access to the namespace they are in.
+
+@@@
+
+An example RBAC that can be used:
+```yaml
+kind: Role
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: pod-patcher
+rules:
+ - apiGroups: [""] # "" indicates the core API group
+ resources: ["pods"]
+ verbs: ["patch"] # requires "patch" to annotate the pod
+---
+kind: RoleBinding
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: annotate-pods
+subjects:
+ - kind: User
+ name: system:serviceaccount:<YOUR NAMESPACE>:default
+roleRef:
+ kind: Role
+ name: pod-patcher
+ apiGroup: rbac.authorization.k8s.io
+```
+
+This defines a `Role` that is allowed to `patch` pod objects and a
`RoleBinding`
+that gives the default service user this role in `<YOUR NAMESPACE>`.
+
+@@@ note
+
+This RBAC example covers only the permissions needed for this
`PodDeletionCost` extension specifically. However, usually you'll also be using
@ref:[Kubernetes API](bootstrap/kubernetes-api.md) for discovery and bootstrap
of your cluster, so you'll need to combine this with any other role required
already configured, either by keeping them separately or merging them into a
single role.
+
+@@@
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 224bf0b0..01225e04 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -146,6 +146,15 @@ object Dependencies {
"org.apache.pekko" %% "pekko-distributed-data" % pekkoVersion % Test,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test)
+ val rollingUpdateKubernetes = Seq(
+ "org.apache.pekko" %% "pekko-actor" % pekkoVersion,
+ "org.apache.pekko" %% "pekko-cluster" % pekkoVersion,
+ "org.apache.pekko" %% "pekko-http" % pekkoHttpVersion,
+ "org.apache.pekko" %% "pekko-http-spray-json" % pekkoHttpVersion,
+ "org.apache.pekko" %% "pekko-testkit" % pekkoVersion % Test,
+ "org.scalatest" %% "scalatest" % scalaTestVersion % Test) ++
+ wireMockDependencies
+
val leaseKubernetes = Seq(
"org.apache.pekko" %% "pekko-actor" % pekkoVersion,
"org.apache.pekko" %% "pekko-coordination" % pekkoVersion,
diff --git a/project/project-info.conf b/project/project-info.conf
index 32ea7984..568a0fa3 100644
--- a/project/project-info.conf
+++ b/project/project-info.conf
@@ -63,4 +63,7 @@ project-info {
discovery-aws-api: ${project-info.shared-info} {
title: "Apache Pekko Discovery AWS"
}
+ rolling-update-kubernetes: ${project-info.shared-info} {
+ title: "Apache Pekko Rolling Update Kubernetes"
+ }
}
diff --git a/rolling-update-kubernetes/src/main/resources/reference.conf
b/rolling-update-kubernetes/src/main/resources/reference.conf
new file mode 100644
index 00000000..4bc33b3a
--- /dev/null
+++ b/rolling-update-kubernetes/src/main/resources/reference.conf
@@ -0,0 +1,49 @@
+# SPDX-License-Identifier: Apache-2.0
+
+######################################################
+# Pekko Rolling Update Kubernetes Config #
+######################################################
+
+pekko.rollingupdate.kubernetes {
+
+ api-ca-path = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
+ api-token-path = "/var/run/secrets/kubernetes.io/serviceaccount/token"
+
+ # Host for the Kubernetes API server. Typically this will be set via an
environment
+ # variable that is set when running inside Kubernetes
+ api-service-host = "localhost"
+ api-service-host = ${?KUBERNETES_SERVICE_HOST}
+
+ # Port for the Kubernetes API server. Typically this will be set via an
environment
+ # variable that is set when running inside Kubernetes
+ api-service-port = 8080
+ api-service-port = ${?KUBERNETES_SERVICE_PORT}
+
+ # Namespace file path. The namespace is to create the lock in. Can be
overridden by "namespace"
+ #
+ # If this path doesn't exist, the namespace will default to "default".
+ namespace-path = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
+
+ # Namespace to query for pods.
+ #
+ # Set this value to a specific string to override discovering the
namespace using namespace-path.
+ namespace = "<namespace>"
+ namespace = ${?KUBERNETES_NAMESPACE}
+
+ # The pod name used to identify the pod against the kubernetes API.
+ pod-name = ""
+ pod-name = ${?KUBERNETES_POD_NAME}
+
+ secure-api-server = true
+
+ # Configuration for the Pod Deletion Cost extension
+ pod-deletion-cost {
+
+ # Total number of pods that will be annotated with the a value for
"controller.kubernetes.io/pod-deletion-cost"
+ # Annotates N pods from oldest to newest with a decreasing value for
each pod (or as many as possible if the cluster has less than N nodes).
+ annotated-pods-nr = 3
+
+ # Fixed time delay between retries when pod annotation fails
+ retry-delay = 5s
+ }
+}
diff --git
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/CostStrategy.scala
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/CostStrategy.scala
new file mode 100644
index 00000000..57aa3cbd
--- /dev/null
+++
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/CostStrategy.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2021 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.rollingupdate
+
+import org.apache.pekko.annotation.InternalApi
+import org.apache.pekko.cluster.Member
+
+import scala.collection.SortedSet
+
+/**
+ * INTERNAL API
+ * Defines a trait for calculating the cost of removing a member from the
pekko cluster,
+ * given said member and the list of the members of the cluster from oldest
to newest.
+ */
+@InternalApi private[rollingupdate] trait CostStrategy {
+ def costOf(member: Member, membersByAgeDesc: SortedSet[Member]): Option[Int]
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[rollingupdate] object OlderCostsMore extends CostStrategy
{
+ def costOf(member: Member, membersByAgeDesc: SortedSet[Member]): Option[Int]
= {
+ val maxCost = 10000
+ // avoiding using subsequent numbers: gives room for evolution and allows
for manual interventions
+ val stepCost = 100
+
+ membersByAgeDesc.zipWithIndex.collectFirst {
+ case (m, cost) if m.uniqueAddress == member.uniqueAddress => maxCost -
(cost * stepCost)
+ }
+ }
+}
diff --git
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/ApiRequests.scala
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/ApiRequests.scala
new file mode 100644
index 00000000..d1e8d294
--- /dev/null
+++
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/ApiRequests.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2021 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.rollingupdate.kubernetes
+
+import org.apache.pekko.annotation.InternalApi
+import org.apache.pekko.http.scaladsl.model.HttpMethods.PATCH
+import org.apache.pekko.http.scaladsl.model.headers.Authorization
+import org.apache.pekko.http.scaladsl.model.headers.OAuth2BearerToken
+import org.apache.pekko.http.scaladsl.model.HttpEntity
+import org.apache.pekko.http.scaladsl.model.HttpRequest
+import org.apache.pekko.http.scaladsl.model.MediaTypes
+import org.apache.pekko.http.scaladsl.model.Uri
+import org.apache.pekko.util.ByteString
+
+import scala.collection.immutable
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[kubernetes] object ApiRequests {
+
+ def podDeletionCost(settings: KubernetesSettings, apiToken: String,
namespace: String, cost: Int): HttpRequest = {
+ val path = Uri.Path.Empty / "api" / "v1" / "namespaces" / namespace /
"pods" / settings.podName
+ val scheme = if (settings.secure) "https" else "http"
+ val uri = Uri.from(scheme, host = settings.apiServiceHost, port =
settings.apiServicePort).withPath(path)
+ val headers = if (settings.secure)
immutable.Seq(Authorization(OAuth2BearerToken(apiToken))) else Nil
+
+ HttpRequest(
+ method = PATCH,
+ uri = uri,
+ headers = headers,
+ entity = HttpEntity(
+ MediaTypes.`application/merge-patch+json`,
+ ByteString(
+ s"""{"metadata": {"annotations":
{"controller.kubernetes.io/pod-deletion-cost": "$cost" }}}"""
+ ))
+ )
+ }
+
+}
diff --git
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesSettings.scala
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesSettings.scala
new file mode 100644
index 00000000..0bee4cf4
--- /dev/null
+++
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesSettings.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2021 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.rollingupdate.kubernetes
+
+import org.apache.pekko.annotation.InternalApi
+import com.typesafe.config.Config
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[kubernetes] object KubernetesSettings {
+
+ private implicit class HasDefined(val config: Config) {
+ def hasDefined(key: String): Boolean =
+ config.hasPath(key) &&
+ config.getString(key).trim.nonEmpty &&
+ config.getString(key) != s"<$key>"
+
+ def optDefinedValue(key: String): Option[String] =
+ if (hasDefined(key)) Some(config.getString(key)) else None
+ }
+
+ def apply(config: Config): KubernetesSettings = {
+ new KubernetesSettings(
+ config.getString("api-ca-path"),
+ config.getString("api-token-path"),
+ config.getString("api-service-host"),
+ config.getInt("api-service-port"),
+ config.optDefinedValue("namespace"),
+ config.getString("namespace-path"),
+ config.getString("pod-name"),
+ config.getBoolean("secure-api-server")
+ )
+ }
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[kubernetes] class KubernetesSettings(
+ val apiCaPath: String,
+ val apiTokenPath: String,
+ val apiServiceHost: String,
+ val apiServicePort: Int,
+ val namespace: Option[String],
+ val namespacePath: String,
+ val podName: String,
+ val secure: Boolean)
diff --git
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCost.scala
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCost.scala
new file mode 100644
index 00000000..3c1f7817
--- /dev/null
+++
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCost.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2021 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.rollingupdate.kubernetes
+
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.actor.ClassicActorSystemProvider
+import org.apache.pekko.actor.ExtendedActorSystem
+import org.apache.pekko.actor.Extension
+import org.apache.pekko.actor.ExtensionId
+import org.apache.pekko.actor.ExtensionIdProvider
+import org.apache.pekko.actor.Props
+import org.apache.pekko.annotation.InternalApi
+import org.apache.pekko.dispatch.Dispatchers.DefaultBlockingDispatcherId
+import org.apache.pekko.event.Logging
+import
org.apache.pekko.rollingupdate.kubernetes.PodDeletionCost.Internal.BootstrapStep
+import
org.apache.pekko.rollingupdate.kubernetes.PodDeletionCost.Internal.Initializing
+import
org.apache.pekko.rollingupdate.kubernetes.PodDeletionCost.Internal.NotRunning
+
+import java.nio.file.Files
+import java.nio.file.Paths
+import java.util.concurrent.atomic.AtomicReference
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.util.control.NonFatal
+
+final class PodDeletionCost(implicit system: ExtendedActorSystem) extends
Extension {
+
+ private val log = Logging(system, classOf[PodDeletionCost])
+ private val configPath = "pekko.rollingupdate.kubernetes"
+ private val config = system.settings.config.getConfig(configPath)
+ private val k8sSettings = KubernetesSettings(config)
+ private val costSettings = PodDeletionCostSettings(config)
+ log.debug("Settings {}", k8sSettings)
+
+ private final val startStep = new AtomicReference[BootstrapStep](NotRunning)
+
+ def start(): Unit = {
+ if (k8sSettings.podName.isEmpty) {
+ log.warning(
+ "No configuration found to extract the pod name from. " +
+ s"Be sure to provide the pod name with `$configPath.pod-name` " +
+ "or by setting ENV variable `KUBERNETES_POD_NAME`.")
+ } else if (startStep.compareAndSet(NotRunning, Initializing)) {
+ log.debug("Starting PodDeletionCost for podName={} with settings={}",
k8sSettings.podName, costSettings)
+
+ implicit val blockingDispatcher: ExecutionContext =
system.dispatchers.lookup(DefaultBlockingDispatcherId)
+ val props = for {
+ apiToken: String <- Future {
readConfigVarFromFilesystem(k8sSettings.apiTokenPath,
"api-token").getOrElse("") }
+ podNamespace: String <- Future {
+ k8sSettings.namespace
+ .orElse(readConfigVarFromFilesystem(k8sSettings.namespacePath,
"namespace"))
+ .getOrElse("default")
+ }
+ } yield Props(classOf[PodDeletionCostAnnotator], k8sSettings, apiToken,
podNamespace, costSettings)
+
+ props.foreach(system.systemActorOf(_, "podDeletionCostAnnotator"))
+ } else log.warning("PodDeletionCost extension already initiated, yet
start() method was called again. Ignoring.")
+ }
+
+ /**
+ * This uses blocking IO, and so should only be used to read configuration
at startup.
+ */
+ private def readConfigVarFromFilesystem(path: String, name: String):
Option[String] = {
+ val file = Paths.get(path)
+ if (Files.exists(file)) {
+ try {
+ Some(new String(Files.readAllBytes(file), "utf-8"))
+ } catch {
+ case NonFatal(e) =>
+ log.error(e, "Error reading {} from {}", name, path)
+ None
+ }
+ } else {
+ log.warning("Unable to read {} from {} because it doesn't exist.", name,
path)
+ None
+ }
+ }
+
+ // autostart if the extension is loaded through the config extension list
+ private val autostart =
+
system.settings.config.getStringList("pekko.extensions").contains(classOf[PodDeletionCost].getName)
+
+ if (autostart) {
+ log.info("PodDeletionCost loaded through 'pekko.extensions' auto-starting
itself.")
+ try {
+ PodDeletionCost(system).start()
+ } catch {
+ case NonFatal(ex) =>
+ log.error(ex, "Failed to autostart PodDeletionCost extension")
+ }
+ }
+}
+
+object PodDeletionCost extends ExtensionId[PodDeletionCost] with
ExtensionIdProvider {
+
+ override def lookup: PodDeletionCost.type = PodDeletionCost
+
+ override def get(system: ActorSystem): PodDeletionCost = super.get(system)
+
+ override def get(system: ClassicActorSystemProvider): PodDeletionCost =
super.get(system)
+
+ override def createExtension(system: ExtendedActorSystem): PodDeletionCost =
new PodDeletionCost()(system)
+
+ /**
+ * INTERNAL API
+ */
+ @InternalApi private[kubernetes] object Internal {
+ sealed trait BootstrapStep
+ case object NotRunning extends BootstrapStep
+ case object Initializing extends BootstrapStep
+ }
+
+}
diff --git
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotator.scala
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotator.scala
new file mode 100644
index 00000000..b0873e59
--- /dev/null
+++
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotator.scala
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2021 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.rollingupdate.kubernetes
+
+import org.apache.pekko.actor.Actor
+import org.apache.pekko.actor.ActorLogging
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.actor.Timers
+import org.apache.pekko.annotation.InternalApi
+import org.apache.pekko.cluster.Cluster
+import org.apache.pekko.cluster.ClusterEvent
+import org.apache.pekko.cluster.Member
+import org.apache.pekko.event.Logging.InfoLevel
+import org.apache.pekko.event.Logging.WarningLevel
+import org.apache.pekko.http.scaladsl.ConnectionContext
+import org.apache.pekko.http.scaladsl.Http
+import org.apache.pekko.http.scaladsl.HttpsConnectionContext
+import org.apache.pekko.http.scaladsl.model.StatusCodes.ClientError
+import org.apache.pekko.http.scaladsl.model._
+import org.apache.pekko.pattern.pipe
+import org.apache.pekko.pki.kubernetes.PemManagersProvider
+import org.apache.pekko.rollingupdate.OlderCostsMore
+import
org.apache.pekko.rollingupdate.kubernetes.PodDeletionCostAnnotator.GiveUp
+import
org.apache.pekko.rollingupdate.kubernetes.PodDeletionCostAnnotator.PodAnnotated
+import
org.apache.pekko.rollingupdate.kubernetes.PodDeletionCostAnnotator.RetryAnnotate
+import
org.apache.pekko.rollingupdate.kubernetes.PodDeletionCostAnnotator.RetryTimerId
+import
org.apache.pekko.rollingupdate.kubernetes.PodDeletionCostAnnotator.ScheduleRetry
+import
org.apache.pekko.rollingupdate.kubernetes.PodDeletionCostAnnotator.toResult
+import com.typesafe.config.Config
+
+import java.security.KeyStore
+import java.security.SecureRandom
+import java.util.concurrent.TimeUnit
+import javax.net.ssl.KeyManager
+import javax.net.ssl.KeyManagerFactory
+import javax.net.ssl.SSLContext
+import javax.net.ssl.TrustManager
+import scala.collection.immutable
+import scala.collection.immutable.SortedSet
+import scala.concurrent.ExecutionContextExecutor
+import scala.concurrent.Future
+import scala.concurrent.duration.DurationLong
+import scala.concurrent.duration.FiniteDuration
+import scala.util.control.NonFatal
+
+/**
+ * INTERNAL API
+ *
+ * Actor responsible to annotate the hosting pod with the pod-deletion-cost.
+ * It will automatically retry upon a fixed-configurable delay if the
annotation fails.
+ */
+@InternalApi private[kubernetes] final class PodDeletionCostAnnotator(
+ settings: KubernetesSettings,
+ apiToken: String,
+ podNamespace: String,
+ costSettings: PodDeletionCostSettings)
+ extends Actor
+ with ActorLogging
+ with Timers {
+ private val cluster = Cluster(context.system)
+ private val http = Http()(context.system)
+
+ Cluster(context.system).subscribe(context.self,
classOf[ClusterEvent.MemberUp], classOf[ClusterEvent.MemberRemoved])
+
+ private lazy val sslContext = {
+ val certificates = PemManagersProvider.loadCertificates(settings.apiCaPath)
+ val factory =
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
+ val keyStore = KeyStore.getInstance("PKCS12")
+ keyStore.load(null)
+ factory.init(keyStore, Array.empty)
+ val km: Array[KeyManager] = factory.getKeyManagers
+ val tm: Array[TrustManager] =
+ PemManagersProvider.buildTrustManagers(certificates)
+ val random: SecureRandom = new SecureRandom
+ val sslContext = SSLContext.getInstance("TLSv1.2")
+ sslContext.init(km, tm, random)
+ sslContext
+ }
+ private val clientSslContext: Option[HttpsConnectionContext] =
+ if (settings.secure) Some(ConnectionContext.httpsClient(sslContext)) else
None
+
+ implicit val dispatcher: ExecutionContextExecutor = context.system.dispatcher
+ def receive = idle(0, SortedSet.empty(Member.ageOrdering), 0)
+
+ private def idle(deletionCost: Int, membersByAgeDesc: SortedSet[Member],
retryNr: Int): Receive = {
+ case cs @ ClusterEvent.CurrentClusterState(members, _, _, _, _) =>
+ log.debug("Received CurrentClusterState {}", cs)
+ updateIfNewCost(deletionCost, membersByAgeDesc ++ members, retryNr) //
ordering used is from the first operand (so, by age)
+
+ case ClusterEvent.MemberUp(m) =>
+ log.debug("Received MemberUp {}", m)
+ updateIfNewCost(deletionCost, membersByAgeDesc + m, retryNr)
+
+ case ClusterEvent.MemberRemoved(m, _) =>
+ log.debug("Received MemberRemoved {}", m)
+ updateIfNewCost(deletionCost, membersByAgeDesc - m, retryNr)
+
+ case PodAnnotated =>
+ log.debug("Annotation updated successfully to {}", deletionCost)
+ // cancelling an eventual retry in case the annotation succeeded in the
meantime
+ timers.cancel(RetryTimerId)
+ context.become(idle(deletionCost, membersByAgeDesc, 0))
+
+ case ScheduleRetry(ex) =>
+ val ll = if (retryNr < 3) InfoLevel else WarningLevel
+ log.log(
+ ll,
+ s"Failed to update annotation: [$ex]. Scheduled retry with fixed delay
of ${costSettings.retryDelay}, retry number $retryNr.")
+
+ timers.startSingleTimer(RetryTimerId, RetryAnnotate,
costSettings.retryDelay)
+ context.become(underRetryBackoff(membersByAgeDesc, retryNr))
+
+ case GiveUp(er: String) =>
+ log.error(
+ "There was a client error when trying to set pod-deletion-cost
annotation. " +
+ "Not retrying, check configuration. Error: {}",
+ er)
+
+ case msg => log.debug("Ignoring message {}", msg)
+ }
+
+ private def underRetryBackoff(membersByAgeDesc: SortedSet[Member], retryNr:
Int): Receive = {
+ case ClusterEvent.MemberUp(m) =>
+ log.debug("Received while on retry backoff MemberUp {}", m)
+ context.become(underRetryBackoff(membersByAgeDesc + m, retryNr))
+
+ case ClusterEvent.MemberRemoved(m, _) =>
+ log.debug("Received while on retry backoff MemberRemoved {}", m)
+ context.become(underRetryBackoff(membersByAgeDesc - m, retryNr))
+
+ case RetryAnnotate =>
+ updateIfNewCost(Int.MinValue, membersByAgeDesc, retryNr + 1)
+
+ case msg => log.debug("Under retry backoff, ignoring message {}", msg)
+ }
+
+ private def updateIfNewCost(existingCost: Int, membersByAgeDesc:
immutable.SortedSet[Member], retryNr: Int): Unit = {
+
+ val podsToAnnotate = membersByAgeDesc.take(costSettings.annotatedPodsNr)
+ val newCost: Int = OlderCostsMore.costOf(cluster.selfMember,
podsToAnnotate).getOrElse(0)
+ log.debug(
+ "Calculated cost={} (previously {}) for member={} in members by age
(desc): {}",
+ newCost,
+ existingCost,
+ cluster.selfMember,
+ membersByAgeDesc)
+
+ if (newCost != existingCost) {
+ log.info(
+ "Updating pod-deletion-cost annotation for pod: [{}] with cost: [{}].
Namespace: [{}]",
+ settings.podName,
+ newCost,
+ podNamespace
+ )
+ val request = ApiRequests.podDeletionCost(settings, apiToken,
podNamespace, newCost)
+ val response =
+ clientSslContext.map(http.singleRequest(request,
_)).getOrElse(http.singleRequest(request))
+
+ toResult(response)(context.system).pipeTo(self)
+ context.become(idle(newCost, membersByAgeDesc, retryNr))
+ } else context.become(idle(existingCost, membersByAgeDesc, retryNr))
+ }
+
+}
+
+/**
+ * INTERNAL API
+ * @param annotatedPodsNr the number of members of the cluster that need to be
annotated
+ * @param retryDelay fixed time delay before next attempt to annotate in case
the previous one failed
+ */
+@InternalApi private[kubernetes] final case class PodDeletionCostSettings(
+ annotatedPodsNr: Int,
+ retryDelay: FiniteDuration)
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[kubernetes] object PodDeletionCostSettings {
+ val configPath: String = "pod-deletion-cost"
+ def apply(config: Config): PodDeletionCostSettings =
+ PodDeletionCostSettings(
+ config.getInt(s"$configPath.annotated-pods-nr"),
+ config.getDuration(s"$configPath.retry-delay", TimeUnit.SECONDS).seconds
+ )
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[kubernetes] object PodDeletionCostAnnotator {
+ case object RetryTimerId
+ case object RetryAnnotate
+ sealed trait RequestResult
+ case object PodAnnotated extends RequestResult
+ case class ScheduleRetry(cause: String) extends RequestResult
+ case class GiveUp(cause: String) extends RequestResult
+
+ private[kubernetes] def toResult(futResponse: Future[HttpResponse])(
+ implicit system: ActorSystem): Future[RequestResult] = {
+ import system.dispatcher
+ futResponse
+ .map {
+ case HttpResponse(status, _, e, _) if status.isSuccess() =>
+ e.discardBytes()
+ PodAnnotated
+ case HttpResponse(s @ ClientError(_), _, e, _) =>
+ e.discardBytes()
+ GiveUp(s.toString())
+ case HttpResponse(status, _, e, _) =>
+ e.discardBytes()
+ ScheduleRetry(s"Request failed with status=$status")
+ }
+ .recover {
+ case NonFatal(e) => ScheduleRetry(e.getMessage)
+ }
+ }
+}
diff --git
a/rolling-update-kubernetes/src/test/java/jdoc/pekko/rollingupdate/kubernetes/PodDeletionCostCompileOnly.java
b/rolling-update-kubernetes/src/test/java/jdoc/pekko/rollingupdate/kubernetes/PodDeletionCostCompileOnly.java
new file mode 100644
index 00000000..8820c2eb
--- /dev/null
+++
b/rolling-update-kubernetes/src/test/java/jdoc/pekko/rollingupdate/kubernetes/PodDeletionCostCompileOnly.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2021 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package jdoc.pekko.rollingupdate.kubernetes;
+
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.rollingupdate.kubernetes.PodDeletionCost;
+
+public class PodDeletionCostCompileOnly {
+ public static void bootstrap() {
+
+ ActorSystem system = ActorSystem.create();
+
+ //#start
+ // Starting the pod deletion cost annotator
+ PodDeletionCost.get(system).start();
+ //#start
+ }
+}
diff --git
a/rolling-update-kubernetes/src/test/java/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostJavaCompileTest.java
b/rolling-update-kubernetes/src/test/java/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostJavaCompileTest.java
new file mode 100644
index 00000000..14162498
--- /dev/null
+++
b/rolling-update-kubernetes/src/test/java/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostJavaCompileTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2021 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.rollingupdate.kubernetes;
+
+import org.apache.pekko.actor.ActorSystem;
+import org.junit.jupiter.api.Test;
+
+public class PodDeletionCostJavaCompileTest {
+
+ public void test() {
+ ActorSystem actorSystem = ActorSystem.create("test");
+ PodDeletionCost podDeletionCost = PodDeletionCost.get(actorSystem);
+ }
+
+ @Test
+ public void compileOnly() {}
+}
diff --git
a/rolling-update-kubernetes/src/test/scala/doc/pekko/rollingupdate/kubernetes/PodDeletionCostCompileOnly.scala
b/rolling-update-kubernetes/src/test/scala/doc/pekko/rollingupdate/kubernetes/PodDeletionCostCompileOnly.scala
new file mode 100644
index 00000000..03683284
--- /dev/null
+++
b/rolling-update-kubernetes/src/test/scala/doc/pekko/rollingupdate/kubernetes/PodDeletionCostCompileOnly.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2021 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package doc.pekko.rollingupdate.kubernetes
+
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.rollingupdate.kubernetes.PodDeletionCost
+
+object PodDeletionCostCompileOnly {
+
+ val system = ActorSystem()
+
+ // #start
+ // Starting the pod deletion cost annotator
+ PodDeletionCost(system).start()
+ // #start
+
+}
diff --git
a/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotatorSpec.scala
b/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotatorSpec.scala
new file mode 100644
index 00000000..c0aeeecb
--- /dev/null
+++
b/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotatorSpec.scala
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2021 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.rollingupdate.kubernetes
+
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.actor.Address
+import org.apache.pekko.actor.Props
+import org.apache.pekko.cluster.Cluster
+import org.apache.pekko.cluster.ClusterEvent.MemberUp
+import org.apache.pekko.cluster.Member
+import org.apache.pekko.cluster.MemberStatus
+import org.apache.pekko.cluster.MemberStatus.Up
+import org.apache.pekko.cluster.UniqueAddress
+import org.apache.pekko.testkit.EventFilter
+import org.apache.pekko.testkit.ImplicitSender
+import org.apache.pekko.testkit.TestKit
+import org.apache.pekko.testkit.TestProbe
+import org.apache.pekko.util.Version
+import com.github.tomakehurst.wiremock.WireMockServer
+import com.github.tomakehurst.wiremock.client.MappingBuilder
+import com.github.tomakehurst.wiremock.client.WireMock
+import com.github.tomakehurst.wiremock.client.WireMock.aResponse
+import com.github.tomakehurst.wiremock.client.WireMock.patch
+import com.github.tomakehurst.wiremock.client.WireMock.patchRequestedFor
+import com.github.tomakehurst.wiremock.client.WireMock.stubFor
+import com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo
+import com.github.tomakehurst.wiremock.client.WireMock.verify
+import
com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig
+import com.github.tomakehurst.wiremock.matching.ContainsPattern
+import com.github.tomakehurst.wiremock.matching.EqualToPattern
+import com.typesafe.config.ConfigFactory
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.concurrent.Eventually
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.Millis
+import org.scalatest.time.Seconds
+import org.scalatest.time.Span
+import org.scalatest.wordspec.AnyWordSpecLike
+
+import scala.jdk.CollectionConverters._
+import scala.concurrent.duration._
+
+object PodDeletionCostAnnotatorSpec {
+ val config = ConfigFactory.parseString("""
+ pekko.loggers = ["org.apache.pekko.testkit.TestEventListener"]
+ pekko.actor.provider = cluster
+ pekko.rollingupdate.kubernetes.pod-deletion-cost.retry-delay = 1s
+
+ pekko.remote.artery.canonical.port = 0
+ pekko.remote.artery.canonical.hostname = 127.0.0.1
+
+ pekko.cluster.jmx.multi-mbeans-in-same-jvm = on
+ pekko.coordinated-shutdown.terminate-actor-system = off
+ pekko.coordinated-shutdown.run-by-actor-system-terminate = off
+ pekko.test.filter-leeway = 10s
+ """)
+}
+
+class PodDeletionCostAnnotatorSpec
+ extends TestKit(
+ ActorSystem(
+ "MySpec",
+ PodDeletionCostAnnotatorSpec.config
+ ))
+ with ImplicitSender
+ with AnyWordSpecLike
+ with Matchers
+ with BeforeAndAfterAll
+ with BeforeAndAfterEach
+ with Eventually {
+
+ private val wireMockServer = new WireMockServer(wireMockConfig().port(0))
+ wireMockServer.start()
+ WireMock.configureFor(wireMockServer.port())
+
+ private val namespace = "namespace-test"
+ private val podName1 = "pod-test-1"
+ private val podName2 = "pod-test-2"
+ private lazy val system2 = ActorSystem("MySpec",
PodDeletionCostAnnotatorSpec.config)
+
+ private def settings(podName: String) = {
+ new KubernetesSettings(
+ apiCaPath = "",
+ apiTokenPath = "",
+ apiServiceHost = "localhost",
+ apiServicePort = wireMockServer.port(),
+ namespace = Some(namespace),
+ namespacePath = "",
+ podName = podName,
+ secure = false)
+ }
+
+ private def annotatorProps(pod: String) = Props(
+ classOf[PodDeletionCostAnnotator],
+ settings(pod),
+ "apiToken",
+ namespace,
+
PodDeletionCostSettings(system.settings.config.getConfig("pekko.rollingupdate.kubernetes"))
+ )
+
+ override implicit val patienceConfig: PatienceConfig =
+ PatienceConfig(timeout = Span(5, Seconds), interval = Span(100, Millis))
+
+ override protected def afterAll(): Unit = {
+ super.shutdown()
+ TestKit.shutdownActorSystem(system2)
+ }
+
+ override protected def beforeEach(): Unit = {
+ wireMockServer.resetAll()
+ }
+
+ private def podK8sPath(podName: String) = urlEqualTo("/api/v1/namespaces/" +
namespace + "/pods/" + podName)
+
+ private def stubForPods(podName: String, cost: Int = 10000, returnCode: Int)
=
+ stubFor(patchPodDeletionCost(podName,
cost).willReturn(aResponse().withStatus(returnCode)))
+
+ private def patchPodDeletionCost(podName: String, cost: Int = 10000):
MappingBuilder =
+ patch(podK8sPath(podName))
+ .withHeader("Content-Type", new
EqualToPattern("application/merge-patch+json"))
+ .withRequestBody(new ContainsPattern(
+ s"""{"metadata": {"annotations":
{"controller.kubernetes.io/pod-deletion-cost": "$cost" }}}"""))
+
+ "The pod-deletion-cost annotator, when under normal behavior" should {
+
+ "have a single node cluster running first" in {
+ val probe = TestProbe()
+ Cluster(system).join(Cluster(system).selfMember.address)
+ probe.awaitAssert({
+ Cluster(system).selfMember.status == MemberStatus.Up
+ }, 3.seconds)
+
+ }
+
+ "correctly annotate the cluster node" in {
+ stubForPods(podName1, returnCode = 200)
+ expectLogInfo(pattern = ".*Updating pod-deletion-cost annotation.*") {
+ system.actorOf(annotatorProps(podName1))
+ }
+ eventually {
+ verify(1, patchRequestedFor(podK8sPath(podName1)))
+ }
+ }
+
+ "give up when failing with non-transient error" in {
+ stubForPods(podName1, returnCode = 404)
+ expectLogError(pattern = ".*Not retrying, check configuration.*") {
+ system.actorOf(annotatorProps(podName1))
+ }
+ }
+
+ "retry when failing with transient error" in {
+ val scenarioName = "RetryScenario"
+
+ // first call fails
+ stubFor(
+ patchPodDeletionCost(podName1)
+ .inScenario(scenarioName)
+ .whenScenarioStateIs("FAILING")
+ .willReturn(aResponse().withStatus(500))
+ .willSetStateTo("AVAILABLE"))
+
+ // second call succeeds
+ stubFor(
+ patchPodDeletionCost(podName1)
+ .inScenario(scenarioName)
+ .whenScenarioStateIs("AVAILABLE")
+ .willReturn(aResponse().withStatus(200))
+ .willSetStateTo("OK"))
+ wireMockServer.setScenarioState(scenarioName, "FAILING") // set starting
state to failing
+
+ assertState(scenarioName, "FAILING")
+ system.actorOf(annotatorProps(podName1))
+ assertState(scenarioName, "AVAILABLE")
+ // after the retry backoff delay
+ assertState(scenarioName, "OK")
+
+ wireMockServer.checkForUnmatchedRequests()
+ }
+
+ "annotate a second node correctly" in {
+
+ stubForPods(podName2, cost = 9900, returnCode = 200)
+
+ val probe = TestProbe()
+ Cluster(system2).join(Cluster(system).selfMember.address)
+ probe.awaitAssert({
+ Cluster(system2).selfMember.status == MemberStatus.Up
+ }, 3.seconds)
+
+ system2.actorOf(annotatorProps(podName2))
+ eventually {
+ verify(1, patchRequestedFor(podK8sPath(podName2)))
+ }
+
+ wireMockServer.checkForUnmatchedRequests()
+ }
+
+ }
+
+ "The pod-deletion-cost annotator, when under retry backoff" should {
+
+ "have a single node cluster running first" in {
+ val probe = TestProbe()
+ Cluster(system).join(Cluster(system).selfMember.address)
+ probe.awaitAssert({
+ Cluster(system).selfMember.status == MemberStatus.Up
+ }, 3.seconds)
+ }
+
+ "not annotate until backoff delay expires" in {
+ val scenarioName = "RetryScenario"
+
+ // first call fails
+ stubFor(
+ patchPodDeletionCost(podName1)
+ .inScenario(scenarioName)
+ .whenScenarioStateIs("FAILING")
+ .willReturn(aResponse().withStatus(500)))
+ wireMockServer.setScenarioState(scenarioName, "FAILING") // set starting
state to failing
+
+ // second call succeeds
+ stubFor(
+ patchPodDeletionCost(podName1)
+ .inScenario(scenarioName)
+ .whenScenarioStateIs("AVAILABLE")
+ .willReturn(aResponse().withStatus(200))
+ .willSetStateTo("OK"))
+
+ assertState(scenarioName, "FAILING")
+
+ val underTest = expectLogWarning(".*Failed to update annotation:.*") {
+ system.actorOf(annotatorProps(podName1))
+ }
+
+ wireMockServer.resetRequests()
+ val dummyNewMember =
+ MemberUp(Member(UniqueAddress(Address("pekko", ""), 2L),
Set("dc-default"), Version("v1")).copy(Up))
+ underTest ! dummyNewMember
+ underTest ! dummyNewMember
+ underTest ! dummyNewMember
+
+ // no other interactions should have occurred while on backoff
regardless of updates to the cluster
+ verify(0, patchRequestedFor(podK8sPath(podName1)))
+
+ wireMockServer.setScenarioState(scenarioName, "AVAILABLE")
+
+ eventually {
+ verify(1, patchRequestedFor(podK8sPath(podName1)))
+ }
+ assertState(scenarioName, "OK")
+ }
+ }
+
+ private def assertState(scenarioName: String, state: String) = eventually {
+ val scenario =
wireMockServer.getAllScenarios.getScenarios.asScala.toList.find(_.getName ==
scenarioName).get
+ scenario.getState should ===(state)
+ }
+ def expectLogInfo[T](pattern: String = null)(block: => T): T =
+ EventFilter.info(pattern = pattern, occurrences =
1).intercept(block)(system)
+
+ def expectLogError[T](pattern: String = null, occurrences: Int = 1)(block:
=> T): T =
+ EventFilter.error(pattern = pattern, occurrences =
occurrences).intercept(block)(system)
+
+ def expectLogWarning[T](pattern: String = null, occurrences: Int = 1)(block:
=> T): T =
+ EventFilter.warning(pattern = pattern, occurrences =
occurrences).intercept(block)(system)
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]