This is an automated email from the ASF dual-hosted git repository.

mdedetrich pushed a commit to branch 1.2.x
in repository https://gitbox.apache.org/repos/asf/pekko-management.git


The following commit(s) were added to refs/heads/1.2.x by this push:
     new 18d037fd Get token on every request and make fs calls async
18d037fd is described below

commit 18d037fd2f9129fcb815612a48b5d569d3f68879
Author: Matthew de Detrich <[email protected]>
AuthorDate: Mon Nov 17 15:29:07 2025 +0100

    Get token on every request and make fs calls async
    
    (cherry picked from commit 86f33e79718afa84e73f5638a4a85a80de48c834)
---
 lease-kubernetes/src/main/resources/reference.conf | 15 ++++
 .../lease/kubernetes/KubernetesSettings.scala      | 31 +++++++-
 .../internal/AbstractKubernetesApiImpl.scala       | 93 +++++++++++++++-------
 .../kubernetes/internal/KubernetesApiImpl.scala    | 27 ++++---
 .../internal/NativeKubernetesApiImpl.scala         | 28 ++++---
 .../lease/kubernetes/KubernetesApiSpec.scala       | 93 +++++++++++++++++++++-
 .../lease/kubernetes/NativeKubernetesApiSpec.scala |  4 +-
 7 files changed, 236 insertions(+), 55 deletions(-)

diff --git a/lease-kubernetes/src/main/resources/reference.conf 
b/lease-kubernetes/src/main/resources/reference.conf
index fb36f4ba..c1ee5f7a 100644
--- a/lease-kubernetes/src/main/resources/reference.conf
+++ b/lease-kubernetes/src/main/resources/reference.conf
@@ -52,4 +52,19 @@ pekko.coordination.lease.kubernetes {
     # server that are required. If this timeout is hit then the lease *may* be 
taken due to the response being lost
     # on the way back from the API server but will be reported as not taken 
and can be safely retried.
     lease-operation-timeout = 5s
+
+    # Settings that are specific to retrying requests with 401 responses due 
to possible token rotation
+    token-rotation-retry {
+      # Number of total attempts to make
+      max-attempts = 5
+
+      # The minimum backoff to be used
+      min-backoff = 10 ms
+
+      # The maximum backoff to be used
+      max-backoff = 1 minute
+
+      # The random factor to be used
+      random-factor = 0.3
+    }
 }
diff --git 
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesSettings.scala
 
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesSettings.scala
index 74c956b4..d8ba328e 100644
--- 
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesSettings.scala
+++ 
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesSettings.scala
@@ -53,6 +53,15 @@ private[pekko] object KubernetesSettings {
       apiServerRequestTimeout < leaseTimeoutSettings.operationTimeout,
       "'api-server-request-timeout can not be less than 
'lease-operation-timeout'")
 
+    val retryConfPath = "token-rotation-retry"
+
+    val tokenRetrySettings = new TokenRetrySettings(
+      config.getInt(s"$retryConfPath.max-attempts"),
+      config.getDuration(s"$retryConfPath.min-backoff").asScala,
+      config.getDuration(s"$retryConfPath.max-backoff").asScala,
+      config.getDouble(s"$retryConfPath.random-factor")
+    )
+
     new KubernetesSettings(
       config.getString("api-ca-path"),
       config.getString("api-token-path"),
@@ -63,11 +72,21 @@ private[pekko] object KubernetesSettings {
       apiServerRequestTimeout,
       secure = config.getBoolean("secure-api-server"),
       tlsVersion = config.getString("tls-version"),
-      bodyReadTimeout = apiServerRequestTimeout / 2)
-
+      bodyReadTimeout = apiServerRequestTimeout / 2,
+      tokenRetrySettings = tokenRetrySettings)
   }
 }
 
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] class TokenRetrySettings(
+    val maxAttempts: Int,
+    val minBackoff: FiniteDuration,
+    val maxBackoff: FiniteDuration,
+    val randomFactor: Double)
+
 /**
  * INTERNAL API
  */
@@ -82,4 +101,10 @@ private[pekko] class KubernetesSettings(
     val apiServerRequestTimeout: FiniteDuration,
     val secure: Boolean = true,
     val tlsVersion: String = "TLSv1.2",
-    val bodyReadTimeout: FiniteDuration = 1.second)
+    val bodyReadTimeout: FiniteDuration = 1.second,
+    val tokenRetrySettings: TokenRetrySettings = new TokenRetrySettings(
+      5,
+      10.millis,
+      1.minute,
+      0.3
+    ))
diff --git 
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/AbstractKubernetesApiImpl.scala
 
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/AbstractKubernetesApiImpl.scala
index 32b474cc..dc3b3b87 100644
--- 
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/AbstractKubernetesApiImpl.scala
+++ 
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/AbstractKubernetesApiImpl.scala
@@ -15,25 +15,27 @@ package 
org.apache.pekko.coordination.lease.kubernetes.internal
 
 import org.apache.pekko
 import pekko.Done
-import pekko.actor.ActorSystem
+import pekko.actor.{ ActorSystem, Scheduler }
 import pekko.annotation.InternalApi
 import pekko.coordination.lease.kubernetes.{ KubernetesApi, 
KubernetesSettings, LeaseResource }
 import pekko.coordination.lease.{ LeaseException, LeaseTimeoutException }
+import pekko.dispatch.ExecutionContexts
 import pekko.event.{ LogSource, Logging, LoggingAdapter }
 import pekko.http.scaladsl.model._
 import pekko.http.scaladsl.model.headers.{ Authorization, OAuth2BearerToken }
 import pekko.http.scaladsl.unmarshalling.Unmarshal
 import pekko.http.scaladsl.{ ConnectionContext, Http, HttpExt, 
HttpsConnectionContext }
-import pekko.pattern.after
+import pekko.pattern.{ after, RetrySupport }
 import pekko.pki.kubernetes.PemManagersProvider
+import pekko.stream.scaladsl.{ FileIO, Keep, Sink }
+import pekko.util.ByteString
 
-import java.nio.charset.StandardCharsets
 import java.nio.file.{ Files, Paths }
 import java.security.{ KeyStore, SecureRandom }
 import javax.net.ssl.{ KeyManager, KeyManagerFactory, SSLContext, TrustManager 
}
 import scala.collection.immutable
 import scala.concurrent.Future
-import scala.util.control.NonFatal
+import scala.util.control.{ NoStackTrace, NonFatal }
 
 /**
  * Could be shared between leases: 
https://github.com/akka/akka-management/issues/680
@@ -66,12 +68,24 @@ import scala.util.control.NonFatal
 
   private lazy val clientSslContext: HttpsConnectionContext = 
ConnectionContext.httpsClient(sslContext)
 
-  protected val namespace: String =
-    
settings.namespace.orElse(readConfigVarFromFilesystem(settings.namespacePath, 
"namespace")).getOrElse("default")
+  protected val namespace: Future[String] = {
+    settings.namespace match {
+      case Some(nSpace) => Future.successful(nSpace)
+      case _ =>
+        readConfigVarFromFilesystem(settings.namespacePath, 
"namespace").map(_.getOrElse("default"))(
+          ExecutionContexts.parasitic)
+    }
+  }
 
   protected val scheme: String = if (settings.secure) "https" else "http"
-  private lazy val apiToken = 
readConfigVarFromFilesystem(settings.apiTokenPath, "api-token").getOrElse("")
-  private lazy val headers = if (settings.secure) 
immutable.Seq(Authorization(OAuth2BearerToken(apiToken))) else Nil
+  private[pekko] def apiToken() = 
readConfigVarFromFilesystem(settings.apiTokenPath, "api-token").map(
+    _.getOrElse(""))(ExecutionContexts.parasitic)
+  private def headers() = if (settings.secure) {
+    apiToken().map { token =>
+      immutable.Seq(Authorization(OAuth2BearerToken(token)))
+    }(ExecutionContexts.parasitic)
+  } else
+    Future.successful(Nil)
 
   log.debug("kubernetes access namespace: {}. Secure: {}", namespace, 
settings.secure)
 
@@ -79,7 +93,7 @@ import scala.util.control.NonFatal
 
   protected def getLeaseResource(name: String): Future[Option[LeaseResource]]
 
-  protected def pathForLease(name: String): Uri.Path
+  protected def pathForLease(name: String): Future[Uri.Path]
 
   override def readOrCreateLeaseResource(name: String): Future[LeaseResource] 
= {
     // TODO backoff retry
@@ -110,10 +124,9 @@ import scala.util.control.NonFatal
 
   private[pekko] def removeLease(name: String): Future[Done] = {
     for {
-      response <- makeRequest(
-        requestForPath(pathForLease(name), HttpMethods.DELETE),
-        s"Timed out removing lease [$name]. It is not known if the remove 
happened")
-
+      leasePath <- pathForLease(name)
+      request <- requestForPath(leasePath, HttpMethods.DELETE)
+      response <- makeRequest(request, s"Timed out removing lease [$name]. It 
is not known if the remove happened")
       result <- response.status match {
         case StatusCodes.OK =>
           log.debug("Lease deleted {}", name)
@@ -148,17 +161,43 @@ import scala.util.control.NonFatal
   protected def requestForPath(
       path: Uri.Path,
       method: HttpMethod = HttpMethods.GET,
-      entity: RequestEntity = HttpEntity.Empty): HttpRequest = {
+      entity: RequestEntity = HttpEntity.Empty): Future[HttpRequest] = {
     val uri = Uri.from(scheme = scheme, host = settings.apiServerHost, port = 
settings.apiServerPort).withPath(path)
-    HttpRequest(uri = uri, headers = headers, method = method, entity = entity)
+    headers().map { headers =>
+      HttpRequest(uri = uri, headers = headers, method = method, entity = 
entity)
+    }(ExecutionContexts.parasitic)
   }
 
+  private[pekko] def makeRawRequest(request: HttpRequest): 
Future[HttpResponse] = {
+    if (settings.secure)
+      http.singleRequest(request, clientSslContext)
+    else
+      http.singleRequest(request)
+  }
+
+  // This exception is being thrown/caught because we are forced to use Pekko 
1.0.x's
+  // version of RetrySupport.retry which only works on the attempt functions 
throwing
+  // exceptions
+  private case class UnauthorizedException(httpResponse: HttpResponse) extends 
Throwable with NoStackTrace
+
   protected def makeRequest(request: HttpRequest, timeoutMsg: String): 
Future[HttpResponse] = {
-    val response =
-      if (settings.secure)
-        http.singleRequest(request, clientSslContext)
-      else
-        http.singleRequest(request)
+    // It's possible to legitimately get a 401 response due to kubernetes 
doing a token rotation
+    implicit val scheduler: Scheduler = system.scheduler
+    val response = RetrySupport.retry(
+      () =>
+        makeRawRequest(request: HttpRequest).flatMap { response =>
+          if (response.status == StatusCodes.Unauthorized) {
+            log.warning("Received status code 401 as response, retrying due to 
possible token rotation")
+            Future.failed(UnauthorizedException(response))
+          } else Future.successful(response)
+        },
+      settings.tokenRetrySettings.maxAttempts,
+      settings.tokenRetrySettings.minBackoff,
+      settings.tokenRetrySettings.maxBackoff,
+      settings.tokenRetrySettings.randomFactor
+    ).recover {
+      case unauthorized: UnauthorizedException => unauthorized.httpResponse
+    }
 
     // make sure we always consume response body (in case of timeout)
     val strictResponse = response.flatMap(_.toStrict(settings.bodyReadTimeout))
@@ -169,22 +208,22 @@ import scala.util.control.NonFatal
     Future.firstCompletedOf(Seq(strictResponse, timeout))
   }
 
-  /**
-   * This uses blocking IO, and so should only be used to read configuration 
at startup.
-   */
-  protected def readConfigVarFromFilesystem(path: String, name: String): 
Option[String] = {
+  protected def readConfigVarFromFilesystem(path: String, name: String): 
Future[Option[String]] = {
     val file = Paths.get(path)
     if (Files.exists(file)) {
       try {
-        Some(new String(Files.readAllBytes(file), StandardCharsets.UTF_8))
+        FileIO.fromPath(file)
+          .toMat(Sink.fold(ByteString.empty)(_ ++ _))(Keep.right)
+          .run()
+          .map(bs => Some(bs.utf8String))(ExecutionContexts.parasitic)
       } catch {
         case NonFatal(e) =>
           log.error(e, "Error reading {} from {}", name, path)
-          None
+          Future.successful(None)
       }
     } else {
       log.warning("Unable to read {} from {} because it doesn't exist.", name, 
path)
-      None
+      Future.successful(None)
     }
   }
 
diff --git 
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/KubernetesApiImpl.scala
 
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/KubernetesApiImpl.scala
index d3abbf6d..e739d38a 100644
--- 
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/KubernetesApiImpl.scala
+++ 
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/KubernetesApiImpl.scala
@@ -20,6 +20,7 @@ import pekko.actor.ActorSystem
 import pekko.annotation.InternalApi
 import pekko.coordination.lease.LeaseException
 import pekko.coordination.lease.kubernetes.{ KubernetesSettings, LeaseResource 
}
+import pekko.dispatch.ExecutionContexts
 import pekko.http.scaladsl.marshalling.Marshal
 import pekko.http.scaladsl.model._
 import pekko.http.scaladsl.unmarshalling.Unmarshal
@@ -59,10 +60,12 @@ PUTs must contain resourceVersions. Response:
     val lcr = LeaseCustomResource(Metadata(leaseName, Some(version)), 
Spec(ownerName, System.currentTimeMillis()))
     for {
       entity <- Marshal(lcr).to[RequestEntity]
+      leasePath <- pathForLease(leaseName)
+      request <- requestForPath(leasePath, method = HttpMethods.PUT, entity)
       response <- {
         log.debug("updating {} to {}", leaseName, lcr)
         makeRequest(
-          requestForPath(pathForLease(leaseName), method = HttpMethods.PUT, 
entity),
+          request,
           s"Timed out updating lease [$leaseName] to owner [$ownerName]. It is 
not known if the update happened")
       }
       result <- response.status match {
@@ -97,9 +100,10 @@ PUTs must contain resourceVersions. Response:
   }
 
   override def getLeaseResource(name: String): Future[Option[LeaseResource]] = 
{
-    val fResponse = makeRequest(requestForPath(pathForLease(name)), s"Timed 
out reading lease $name")
     for {
-      response <- fResponse
+      leasePath <- pathForLease(name)
+      request <- requestForPath(leasePath)
+      response <- makeRequest(request, s"Timed out reading lease $name")
       entity <- response.entity.toStrict(settings.bodyReadTimeout)
       lr <- response.status match {
         case StatusCodes.OK =>
@@ -127,18 +131,21 @@ PUTs must contain resourceVersions. Response:
     } yield lr
   }
 
-  override def pathForLease(name: String): Uri.Path =
-    Uri.Path.Empty / "apis" / "pekko.apache.org" / "v1" / "namespaces" / 
namespace / "leases" / name
-      .replaceAll("[^\\d\\w\\-\\.]", "")
-      .toLowerCase
+  override def pathForLease(name: String): Future[Uri.Path] = {
+    namespace.map { ns =>
+      Uri.Path.Empty / "apis" / "pekko.apache.org" / "v1" / "namespaces" / ns 
/ "leases" / name
+        .replaceAll("[^\\d\\w\\-\\.]", "")
+        .toLowerCase
+    }(ExecutionContexts.parasitic)
+  }
 
   override def createLeaseResource(name: String): 
Future[Option[LeaseResource]] = {
     val lcr = LeaseCustomResource(Metadata(name, None), Spec("", 
System.currentTimeMillis()))
     for {
       entity <- Marshal(lcr).to[RequestEntity]
-      response <- makeRequest(
-        requestForPath(pathForLease(name), HttpMethods.POST, entity = entity),
-        s"Timed out creating lease $name")
+      leasePath <- pathForLease(name)
+      request <- requestForPath(leasePath, HttpMethods.POST, entity = entity)
+      response <- makeRequest(request, s"Timed out creating lease $name")
       responseEntity <- response.entity.toStrict(settings.bodyReadTimeout)
       lr <- response.status match {
         case StatusCodes.Created =>
diff --git 
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/NativeKubernetesApiImpl.scala
 
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/NativeKubernetesApiImpl.scala
index 6fb5eb07..250d59ec 100644
--- 
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/NativeKubernetesApiImpl.scala
+++ 
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/NativeKubernetesApiImpl.scala
@@ -23,6 +23,7 @@ import pekko.annotation.InternalApi
 import 
pekko.coordination.lease.kubernetes.internal.NativeKubernetesApiImpl.RFC3339MICRO_FORMATTER
 import pekko.coordination.lease.kubernetes.{ KubernetesSettings, LeaseResource 
}
 import pekko.coordination.lease.LeaseException
+import pekko.dispatch.ExecutionContexts
 import pekko.http.scaladsl.marshalling.Marshal
 import pekko.http.scaladsl.model._
 import pekko.http.scaladsl.unmarshalling.Unmarshal
@@ -68,10 +69,11 @@ object NativeKubernetesApiImpl {
     val lcr = NativeLeaseResource(Metadata(leaseName, Some(version)), 
NativeSpec(ownerName, currentTimeRFC3339))
     for {
       entity <- Marshal(lcr).to[RequestEntity]
+      leasePath <- pathForLease(leaseName)
+      request <- requestForPath(leasePath, method = HttpMethods.PUT, entity)
       response <- {
         log.debug("updating {} to {}", leaseName, lcr)
-        makeRequest(
-          requestForPath(pathForLease(leaseName), method = HttpMethods.PUT, 
entity),
+        makeRequest(request,
           s"Timed out updating lease [$leaseName] to owner [$ownerName]. It is 
not known if the update happened")
       }
       result <- response.status match {
@@ -106,9 +108,10 @@ object NativeKubernetesApiImpl {
   }
 
   override def getLeaseResource(name: String): Future[Option[LeaseResource]] = 
{
-    val fResponse = makeRequest(requestForPath(pathForLease(name)), s"Timed 
out reading lease $name")
     for {
-      response <- fResponse
+      leasePath <- pathForLease(name)
+      request <- requestForPath(leasePath)
+      response <- makeRequest(request, s"Timed out reading lease $name")
       entity <- response.entity.toStrict(settings.bodyReadTimeout)
       lr <- response.status match {
         case StatusCodes.OK =>
@@ -136,18 +139,21 @@ object NativeKubernetesApiImpl {
     } yield lr
   }
 
-  override def pathForLease(name: String): Uri.Path =
-    Uri.Path.Empty / "apis" / "coordination.k8s.io" / "v1" / "namespaces" / 
namespace / "leases" / name
-      .replaceAll("[^\\d\\w\\-\\.]", "")
-      .toLowerCase
+  override def pathForLease(name: String): Future[Uri.Path] = {
+    namespace.map { ns =>
+      Uri.Path.Empty / "apis" / "coordination.k8s.io" / "v1" / "namespaces" / 
ns / "leases" / name
+        .replaceAll("[^\\d\\w\\-\\.]", "")
+        .toLowerCase
+    }(ExecutionContexts.parasitic)
+  }
 
   override def createLeaseResource(name: String): 
Future[Option[LeaseResource]] = {
     val lcr = NativeLeaseResource(Metadata(name, None), NativeSpec("", 
currentTimeRFC3339))
     for {
       entity <- Marshal(lcr).to[RequestEntity]
-      response <- makeRequest(
-        requestForPath(pathForLease(""), HttpMethods.POST, entity = entity),
-        s"Timed out creating lease $name")
+      leasePath <- pathForLease("")
+      request <- requestForPath(leasePath, HttpMethods.POST, entity = entity)
+      response <- makeRequest(request, s"Timed out creating lease $name")
       responseEntity <- response.entity.toStrict(settings.bodyReadTimeout)
       lr <- response.status match {
         case StatusCodes.Created =>
diff --git 
a/lease-kubernetes/src/test/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesApiSpec.scala
 
b/lease-kubernetes/src/test/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesApiSpec.scala
index bb640048..63811d4b 100644
--- 
a/lease-kubernetes/src/test/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesApiSpec.scala
+++ 
b/lease-kubernetes/src/test/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesApiSpec.scala
@@ -13,12 +13,17 @@
 
 package org.apache.pekko.coordination.lease.kubernetes
 
+import java.io.File
+import java.nio.file.Files
+import java.util.concurrent.atomic.AtomicBoolean
+import scala.concurrent.Future
 import scala.concurrent.duration._
 import org.apache.pekko
 import pekko.Done
 import pekko.actor.ActorSystem
 import pekko.coordination.lease.kubernetes.internal.KubernetesApiImpl
-import pekko.http.scaladsl.model.StatusCodes
+import pekko.coordination.lease.LeaseException
+import pekko.http.scaladsl.model.{ HttpRequest, HttpResponse, StatusCodes }
 import pekko.testkit.TestKit
 import com.github.tomakehurst.wiremock.WireMockServer
 import com.github.tomakehurst.wiremock.client.WireMock
@@ -65,7 +70,8 @@ class KubernetesApiSpec
 
   val underTest = new KubernetesApiImpl(system, settings) {
     // avoid touching slow CI filesystem
-    override protected def readConfigVarFromFilesystem(path: String, name: 
String): Option[String] = None
+    override protected def readConfigVarFromFilesystem(path: String, name: 
String): Future[Option[String]] =
+      Future.successful(None)
   }
   val leaseName = "lease-1"
   val client1 = "client-1"
@@ -269,6 +275,87 @@ class KubernetesApiSpec
       underTest.removeLease(lease).failed.futureValue.getMessage shouldEqual
       s"Timed out removing lease [$lease]. It is not known if the remove 
happened. Is the API server up?"
     }
-  }
 
+    "not cache token so it can be rotated" in {
+      val path = File.createTempFile("kubernetes-api-spec", null)
+      path.deleteOnExit()
+
+      val newSettings = new KubernetesSettings(
+        "",
+        path.getAbsolutePath,
+        "localhost",
+        wireMockServer.port(),
+        namespace = Some("lease"),
+        "",
+        apiServerRequestTimeout = 1.second,
+        secure = false)
+
+      val tokenTest = new KubernetesApiImpl(system, newSettings)
+
+      val firstTokenValue = "first"
+      val secondTokenValue = "second"
+
+      Files.write(path.toPath, firstTokenValue.getBytes)
+      tokenTest.apiToken().futureValue shouldEqual firstTokenValue
+      Files.write(path.toPath, secondTokenValue.getBytes)
+      tokenTest.apiToken().futureValue shouldEqual secondTokenValue
+    }
+
+    "retry on 401 to handle token timeout" in {
+      val newSettings = new KubernetesSettings(
+        "",
+        "",
+        "localhost",
+        wireMockServer.port(),
+        namespace = Some("lease"),
+        "",
+        apiServerRequestTimeout = 1.second,
+        secure = false)
+
+      val toFail = new AtomicBoolean(true)
+      val retryUnauthorized = new KubernetesApiImpl(system, newSettings) {
+        // avoid touching slow CI filesystem
+        override protected def readConfigVarFromFilesystem(path: String, name: 
String): Future[Option[String]] =
+          Future.successful(None)
+
+        override def makeRawRequest(request: HttpRequest): 
Future[HttpResponse] =
+          if (toFail.getAndSet(false))
+            Future.successful(HttpResponse(
+              StatusCodes.Unauthorized
+            ))
+          else
+            Future.successful(HttpResponse(
+              StatusCodes.NotFound
+            ))
+      }
+
+      retryUnauthorized.getLeaseResource("").futureValue shouldEqual None
+    }
+
+    "eventually return unauthorized LeaseException when token rotation is not 
happening" in {
+      val newSettings = new KubernetesSettings(
+        "",
+        "",
+        "localhost",
+        wireMockServer.port(),
+        namespace = Some("lease"),
+        "",
+        apiServerRequestTimeout = 1.second,
+        secure = false)
+
+      val retryUnauthorized = new KubernetesApiImpl(system, newSettings) {
+        // avoid touching slow CI filesystem
+        override protected def readConfigVarFromFilesystem(path: String, name: 
String): Future[Option[String]] =
+          Future.successful(None)
+
+        override def makeRawRequest(request: HttpRequest): 
Future[HttpResponse] =
+          Future.successful(HttpResponse(
+            StatusCodes.Unauthorized
+          ))
+      }
+
+      retryUnauthorized.getLeaseResource("").failed.futureValue shouldBe 
an[LeaseException]
+    }
+
+  }
 }
diff --git 
a/lease-kubernetes/src/test/scala/org/apache/pekko/coordination/lease/kubernetes/NativeKubernetesApiSpec.scala
 
b/lease-kubernetes/src/test/scala/org/apache/pekko/coordination/lease/kubernetes/NativeKubernetesApiSpec.scala
index 74a108a1..f18e11d7 100644
--- 
a/lease-kubernetes/src/test/scala/org/apache/pekko/coordination/lease/kubernetes/NativeKubernetesApiSpec.scala
+++ 
b/lease-kubernetes/src/test/scala/org/apache/pekko/coordination/lease/kubernetes/NativeKubernetesApiSpec.scala
@@ -36,6 +36,7 @@ import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.matchers.should.Matchers
 import org.scalatest.wordspec.AnyWordSpecLike
 
+import scala.concurrent.Future
 import scala.concurrent.duration._
 
 class NativeKubernetesApiSpec
@@ -71,7 +72,8 @@ class NativeKubernetesApiSpec
 
   val underTest = new NativeKubernetesApiImpl(system, settings) {
     // avoid touching slow CI filesystem
-    override protected def readConfigVarFromFilesystem(path: String, name: 
String): Option[String] = None
+    override protected def readConfigVarFromFilesystem(path: String, name: 
String): Future[Option[String]] =
+      Future.successful(None)
   }
   val leaseName = "lease-1"
   val client1 = "client-1"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to