This is an automated email from the ASF dual-hosted git repository. style95 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 236ca5e4b Clean Up Etcd Worker Actor (#5323) 236ca5e4b is described below commit 236ca5e4b894e4cc626f685c1d0eba5c3e6077ec Author: Brendan Doyle <bdoyle0...@gmail.com> AuthorDate: Wed Oct 5 00:14:54 2022 -0400 Clean Up Etcd Worker Actor (#5323) * clean up etcd worker actor * revert etcd client local change for unit testing * fix scala 2.13 compilation Co-authored-by: Brendan Doyle <brend...@qualtrics.com> --- .../apache/openwhisk/core/etcd/EtcdWorker.scala | 166 +++++++++++++++++++ .../core/service/DataManagementService.scala | 151 +----------------- .../core/invoker/FPCInvokerReactive.scala | 4 +- .../openwhisk/core/scheduler/Scheduler.scala | 4 +- .../openwhisk/common/etcd/EtcdWorkerTests.scala | 176 +++++++++++++++++++++ 5 files changed, 347 insertions(+), 154 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdWorker.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdWorker.scala new file mode 100644 index 000000000..82d78e693 --- /dev/null +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdWorker.scala @@ -0,0 +1,166 @@ +package org.apache.openwhisk.core.etcd + +import akka.actor.{Actor, ActorRef, ActorSystem, Props, Timers} +import io.grpc.StatusRuntimeException +import org.apache.openwhisk.common.Logging +import org.apache.openwhisk.core.etcd.EtcdWorker.GetLeaseAndRetry +import org.apache.openwhisk.core.service.DataManagementService.retryInterval +import org.apache.openwhisk.core.service.{ + AlreadyExist, + Done, + ElectLeader, + ElectionResult, + FinishWork, + GetLease, + InitialDataStorageResults, + Lease, + RegisterData, + RegisterInitialData, + WatcherClosed +} + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration.FiniteDuration +import scala.util.Success + +class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec: ExecutionContext, + actorSystem: ActorSystem, + logging: Logging) + extends Actor + with Timers { + + private val dataManagementService = context.parent + private var lease: Option[Lease] = None + leaseService ! GetLease + + override def receive: Receive = { + case msg: Lease => + lease = Some(msg) + case msg: GetLeaseAndRetry => + logging.warn(this, msg.log) + if (!msg.skipLeaseRefresh) { + if (msg.clearLease) { + lease = None + } + leaseService ! GetLease + } + sendMessageToSelfAfter(msg.request, retryInterval) + // leader election + endpoint management + case request: ElectLeader => + lease match { + case Some(l) => + etcdClient + .electLeader(request.key, request.value, l) + .andThen { + case Success(msg) => + request.recipient ! ElectionResult(msg) + dataManagementService ! FinishWork(request.key) + } + .recover { + // if there is no lease, reissue it and retry immediately + case t: StatusRuntimeException => + self ! GetLeaseAndRetry(request, s"a lease is expired while leader election, reissue it: $t") + // it should retry forever until the data is stored + case t: Throwable => + self ! GetLeaseAndRetry( + request, + s"unexpected error happened: $t, retry storing data", + skipLeaseRefresh = true) + } + case None => + self ! GetLeaseAndRetry(request, s"lease not found, retry storing data ${request.key}", clearLease = false) + } + + // only endpoint management + case request: RegisterData => + lease match { + case Some(l) => + etcdClient + .put(request.key, request.value, l.id) + .andThen { + case Success(_) => + dataManagementService ! FinishWork(request.key) + } + .recover { + // if there is no lease, reissue it and retry immediately + case t: StatusRuntimeException => + self ! GetLeaseAndRetry( + request, + s"a lease is expired while registering data ${request.key}, reissue it: $t") + // it should retry forever until the data is stored + case t: Throwable => + self ! GetLeaseAndRetry( + request, + s"unexpected error happened: $t, retry storing data ${request.key}", + skipLeaseRefresh = true) + } + case None => + self ! GetLeaseAndRetry(request, s"lease not found, retry storing data ${request.key}", clearLease = false) + } + // it stores the data iif there is no such one + case request: RegisterInitialData => + lease match { + case Some(l) => + etcdClient + .putTxn(request.key, request.value, 0, l.id) + .map { res => + dataManagementService ! FinishWork(request.key) + if (res.getSucceeded) { + logging.info(this, s"initial data storing succeeds for ${request.key}") + request.recipient.map(_ ! InitialDataStorageResults(request.key, Right(Done()))) + } else { + logging.info(this, s"data is already stored for: $request, cancel the initial data storing") + request.recipient.map(_ ! InitialDataStorageResults(request.key, Left(AlreadyExist()))) + } + } + .recover { + // if there is no lease, reissue it and retry immediately + case t: StatusRuntimeException => + self ! GetLeaseAndRetry( + request, + s"a lease is expired while registering an initial data ${request.key}, reissue it: $t") + // it should retry forever until the data is stored + case t: Throwable => + self ! GetLeaseAndRetry( + request, + s"unexpected error happened: $t, retry storing data ${request.key}", + skipLeaseRefresh = true) + } + case None => + self ! GetLeaseAndRetry(request, s"lease not found, retry storing data ${request.key}", clearLease = false) + } + + case msg: WatcherClosed => + etcdClient + .del(msg.key) + .andThen { + case Success(_) => + dataManagementService ! FinishWork(msg.key) + } + .recover { + // if there is no lease, reissue it and retry immediately + case t: StatusRuntimeException => + self ! GetLeaseAndRetry(msg, s"a lease is expired while deleting data ${msg.key}, reissue it: $t") + // it should retry forever until the data is stored + case t: Throwable => + self ! GetLeaseAndRetry( + msg, + s"unexpected error happened: $t, retry storing data for ${msg.key}", + skipLeaseRefresh = true) + } + } + + private def sendMessageToSelfAfter(msg: Any, retryInterval: FiniteDuration) = { + timers.startSingleTimer(msg, msg, retryInterval) + } +} + +object EtcdWorker { + case class GetLeaseAndRetry(request: Any, log: String, clearLease: Boolean = true, skipLeaseRefresh: Boolean = false) + + def props(etcdClient: EtcdClient, leaseService: ActorRef)(implicit ec: ExecutionContext, + actorSystem: ActorSystem, + logging: Logging): Props = { + Props(new EtcdWorker(etcdClient, leaseService)) + } +} diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala index 425832679..c03070bf8 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala @@ -19,18 +19,14 @@ package org.apache.openwhisk.core.service import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props} import akka.util.Timeout -import io.grpc.StatusRuntimeException import org.apache.openwhisk.common.Logging import org.apache.openwhisk.core.ConfigKeys -import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader} -import org.apache.openwhisk.core.service.DataManagementService.retryInterval +import org.apache.openwhisk.core.etcd.{EtcdFollower, EtcdLeader} import pureconfig.loadConfigOrThrow import scala.collection.concurrent.TrieMap import scala.collection.mutable.{Map, Queue} -import scala.concurrent.ExecutionContext import scala.concurrent.duration._ -import scala.util.Success // messages received by the actor // it is required to specify a recipient directly for the retryable message processing @@ -181,148 +177,3 @@ object DataManagementService { Props(new DataManagementService(watcherService, workerFactory)) } } - -private[service] class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec: ExecutionContext, - actorSystem: ActorSystem, - logging: Logging) - extends Actor { - - private val dataManagementService = context.parent - private var lease: Option[Lease] = None - leaseService ! GetLease - - override def receive: Receive = { - case msg: Lease => - lease = Some(msg) - - // leader election + endpoint management - case request: ElectLeader => - lease match { - case Some(l) => - etcdClient - .electLeader(request.key, request.value, l) - .andThen { - case Success(msg) => - request.recipient ! ElectionResult(msg) - dataManagementService ! FinishWork(request.key) - } - .recover { - // if there is no lease, reissue it and retry immediately - case t: StatusRuntimeException => - logging.warn(this, s"a lease is expired while leader election, reissue it: $t") - lease = None - leaseService ! GetLease - sendMessageToSelfAfter(request, retryInterval) - - // it should retry forever until the data is stored - case t: Throwable => - logging.warn(this, s"unexpected error happened: $t, retry storing data") - sendMessageToSelfAfter(request, retryInterval) - } - case None => - logging.warn(this, s"lease not found, retry storing data") - leaseService ! GetLease - sendMessageToSelfAfter(request, retryInterval) - } - - // only endpoint management - case request: RegisterData => - lease match { - case Some(l) => - etcdClient - .put(request.key, request.value, l.id) - .andThen { - case Success(_) => - dataManagementService ! FinishWork(request.key) - } - .recover { - // if there is no lease, reissue it and retry immediately - case t: StatusRuntimeException => - logging.warn(this, s"a lease is expired while registering data ${request.key}, reissue it: $t") - lease = None - leaseService ! GetLease - sendMessageToSelfAfter(request, retryInterval) - - // it should retry forever until the data is stored - case t: Throwable => - logging.warn(this, s"unexpected error happened: $t, retry storing data ${request.key}") - sendMessageToSelfAfter(request, retryInterval) - } - case None => - logging.warn(this, s"lease not found, retry storing data ${request.key}") - leaseService ! GetLease - sendMessageToSelfAfter(request, retryInterval) - } - - // it stores the data iif there is no such one - case request: RegisterInitialData => - lease match { - case Some(l) => - etcdClient - .putTxn(request.key, request.value, 0, l.id) - .map { res => - dataManagementService ! FinishWork(request.key) - if (res.getSucceeded) { - logging.info(this, s"initial data storing succeeds for ${request.key}") - request.recipient.map(_ ! InitialDataStorageResults(request.key, Right(Done()))) - } else { - logging.info(this, s"data is already stored for: $request, cancel the initial data storing") - request.recipient.map(_ ! InitialDataStorageResults(request.key, Left(AlreadyExist()))) - } - } - .recover { - // if there is no lease, reissue it and retry immediately - case t: StatusRuntimeException => - logging.warn( - this, - s"a lease is expired while registering an initial data ${request.key}, reissue it: $t") - lease = None - leaseService ! GetLease - sendMessageToSelfAfter(request, retryInterval) - - // it should retry forever until the data is stored - case t: Throwable => - logging.warn(this, s"unexpected error happened: $t, retry storing data for ${request.key}") - sendMessageToSelfAfter(request, retryInterval) - } - case None => - logging.warn(this, s"lease not found, retry storing data for ${request.key}") - leaseService ! GetLease - sendMessageToSelfAfter(request, retryInterval) - } - - case msg: WatcherClosed => - etcdClient - .del(msg.key) - .andThen { - case Success(_) => - dataManagementService ! FinishWork(msg.key) - } - .recover { - // if there is no lease, reissue it and retry immediately - case t: StatusRuntimeException => - logging.warn(this, s"a lease is expired while deleting data ${msg.key}, reissue it: $t") - lease = None - leaseService ! GetLease - sendMessageToSelfAfter(msg, retryInterval) - - // it should retry forever until the data is stored - case t: Throwable => - logging.warn(this, s"unexpected error happened: $t, retry storing data for ${msg.key}") - sendMessageToSelfAfter(msg, retryInterval) - } - - } - - private def sendMessageToSelfAfter(msg: Any, retryInterval: FiniteDuration) = { - actorSystem.scheduler.scheduleOnce(retryInterval, self, msg) - } -} - -object EtcdWorker { - def props(etcdClient: EtcdClient, leaseService: ActorRef)(implicit ec: ExecutionContext, - actorSystem: ActorSystem, - logging: Logging): Props = { - Props(new EtcdWorker(etcdClient, leaseService)) - } -} diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala index 327c0deda..67660e8a6 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala @@ -37,10 +37,10 @@ import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix import org.apache.openwhisk.core.etcd.EtcdKV.QueueKeys.queue import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, SchedulerKeys} import org.apache.openwhisk.core.etcd.EtcdType._ -import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig} +import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig, EtcdWorker} import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulerStates} -import org.apache.openwhisk.core.service.{DataManagementService, EtcdWorker, LeaseKeepAliveService, WatcherService} +import org.apache.openwhisk.core.service.{DataManagementService, LeaseKeepAliveService, WatcherService} import org.apache.openwhisk.core.{ConfigKeys, WarmUp, WhiskConfig} import org.apache.openwhisk.grpc.{ActivationServiceClient, FetchRequest} import org.apache.openwhisk.spi.SpiLoader diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala index f32520cc7..2038fc1c3 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala @@ -36,11 +36,11 @@ import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentEx import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.core.etcd.EtcdKV.{QueueKeys, SchedulerKeys} import org.apache.openwhisk.core.etcd.EtcdType.ByteStringToString -import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig} +import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig, EtcdWorker} import org.apache.openwhisk.core.scheduler.container.{ContainerManager, CreationJobManager} import org.apache.openwhisk.core.scheduler.grpc.ActivationServiceImpl import org.apache.openwhisk.core.scheduler.queue._ -import org.apache.openwhisk.core.service.{DataManagementService, EtcdWorker, LeaseKeepAliveService, WatcherService} +import org.apache.openwhisk.core.service.{DataManagementService, LeaseKeepAliveService, WatcherService} import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig} import org.apache.openwhisk.grpc.ActivationServiceHandler import org.apache.openwhisk.http.BasicHttpService diff --git a/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdWorkerTests.scala b/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdWorkerTests.scala new file mode 100644 index 000000000..a4203fc28 --- /dev/null +++ b/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdWorkerTests.scala @@ -0,0 +1,176 @@ +package org.apache.openwhisk.common.etcd + +import akka.actor.{ActorRef, ActorSystem} +import akka.testkit.{ImplicitSender, TestActor, TestActorRef, TestKit, TestProbe} +import akka.util.Timeout +import com.ibm.etcd.api.{DeleteRangeResponse, PutResponse, TxnResponse} +import common.StreamLogging +import io.grpc.{Status, StatusRuntimeException} +import org.apache.openwhisk.core.entity.SchedulerInstanceId +import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdLeader, EtcdWorker} +import org.apache.openwhisk.core.service.{ + AlreadyExist, + Done, + ElectLeader, + ElectionResult, + FinishWork, + GetLease, + InitialDataStorageResults, + Lease, + RegisterData, + RegisterInitialData, + WatcherClosed +} +import org.junit.runner.RunWith +import org.scalamock.scalatest.MockFactory +import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers} +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.junit.JUnitRunner + +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration._ + +@RunWith(classOf[JUnitRunner]) +class EtcdWorkerTests + extends TestKit(ActorSystem("EtcdWorker")) + with ImplicitSender + with FlatSpecLike + with ScalaFutures + with Matchers + with MockFactory + with BeforeAndAfterAll + with StreamLogging { + + implicit val timeout: Timeout = Timeout(5.seconds) + implicit val ec: ExecutionContext = system.dispatcher + val leaseService = TestProbe() + val leaseId = 10 + val leaseTtl = 10 + leaseService.setAutoPilot((sender: ActorRef, msg: Any) => + msg match { + case GetLease => + sender ! Lease(leaseId, leaseTtl) + TestActor.KeepRunning + + case _ => + TestActor.KeepRunning + }) + + //val dataManagementService = TestProbe() + val schedulerId = SchedulerInstanceId("scheduler0") + val instanceId = schedulerId + + behavior of "EtcdWorker" + + it should "elect leader and send completion ack to parent" in { + val mockEtcd = mock[EtcdClient] + + val key = "testKey" + val value = "testValue" + val leader = Right(EtcdLeader(key, value, leaseId)) + val etcdWorker = TestActorRef(EtcdWorker.props(mockEtcd, leaseService.ref), self) + + (mockEtcd + .electLeader(_: String, _: String, _: Lease)) + .expects(key, value, *) + .returns(Future.successful(leader)) + + etcdWorker ! ElectLeader(key, value, recipient = self) + + expectMsg(ElectionResult(leader)) + expectMsg(FinishWork(key)) + } + + it should "register initial data when doesn't exit and send completion ack to parent" in { + val mockEtcd = mock[EtcdClient] + + val key = "testKey" + val value = "testValue" + val etcdWorker = TestActorRef(EtcdWorker.props(mockEtcd, leaseService.ref), self) + + (mockEtcd + .putTxn(_: String, _: String, _: Long, _: Long)) + .expects(key, value, *, *) + .returns(Future.successful(TxnResponse.newBuilder().setSucceeded(true).build())) + + etcdWorker ! RegisterInitialData(key, value, recipient = Some(self)) + + expectMsg(FinishWork(key)) + expectMsg(InitialDataStorageResults(key, Right(Done()))) + } + + it should "attempt to register initial data when exists and send completion ack to parent" in { + val mockEtcd = mock[EtcdClient] + + val key = "testKey" + val value = "testValue" + val etcdWorker = TestActorRef(EtcdWorker.props(mockEtcd, leaseService.ref), self) + + (mockEtcd + .putTxn(_: String, _: String, _: Long, _: Long)) + .expects(key, value, *, *) + .returns(Future.successful(TxnResponse.newBuilder().setSucceeded(false).build())) + + etcdWorker ! RegisterInitialData(key, value, recipient = Some(self)) + + expectMsg(FinishWork(key)) + expectMsg(InitialDataStorageResults(key, Left(AlreadyExist()))) + } + + it should "register data and send completion ack to parent" in { + val mockEtcd = mock[EtcdClient] + + val key = "testKey" + val value = "testValue" + val etcdWorker = TestActorRef(EtcdWorker.props(mockEtcd, leaseService.ref), self) + + (mockEtcd + .put(_: String, _: String, _: Long)) + .expects(key, value, leaseId) + .returns(Future.successful(PutResponse.newBuilder().build())) + + etcdWorker ! RegisterData(key, value) + + expectMsg(FinishWork(key)) + } + + it should "delete data when watcher closed" in { + val mockEtcd = mock[EtcdClient] + + val key = "testKey" + val etcdWorker = TestActorRef(EtcdWorker.props(mockEtcd, leaseService.ref), self) + + (mockEtcd + .del(_: String)) + .expects(key) + .returns(Future.successful(DeleteRangeResponse.newBuilder().build())) + + etcdWorker ! WatcherClosed(key, false) + + expectMsg(FinishWork(key)) + } + + it should "retry request after failure if lease does not exist" in { + val mockEtcd = mock[EtcdClient] + + val key = "testKey" + val etcdWorker = TestActorRef(EtcdWorker.props(mockEtcd, leaseService.ref), self) + var firstAttempt = true + (mockEtcd + .del(_: String)) + .expects(key) + .onCall((_: String) => { + if (firstAttempt) { + firstAttempt = false + Future.failed(new StatusRuntimeException(Status.RESOURCE_EXHAUSTED)) + } else { + Future.successful(DeleteRangeResponse.newBuilder().build()) + } + }) + .twice() + + etcdWorker ! WatcherClosed(key, false) + + expectMsg(FinishWork(key)) + } +}