This is an automated email from the ASF dual-hosted git repository. style95 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new ef725a653 Prevent cycle in the QueueManager (#5332) ef725a653 is described below commit ef725a653ab112391f79c274d8e3dcfb915d59a3 Author: Dominic Kim <styl...@apache.org> AuthorDate: Fri Oct 14 13:44:50 2022 +0900 Prevent cycle in the QueueManager (#5332) --- .../org/apache/openwhisk/common/Logging.scala | 1 + .../core/scheduler/queue/QueueManager.scala | 144 +++++++++++++++++---- .../scheduler/queue/test/QueueManagerTests.scala | 72 +++++++++++ 3 files changed, 195 insertions(+), 22 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala index 541aee055..ff82ef5fb 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala @@ -601,6 +601,7 @@ object LoggingMarkers { LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none) def SCHEDULER_QUEUE = LogMarkerToken(scheduler, "queue", counter)(MeasurementUnit.none) def SCHEDULER_QUEUE_CREATE = LogMarkerToken(scheduler, "queueCreate", start)(MeasurementUnit.time.milliseconds) + def SCHEDULER_QUEUE_RECOVER = LogMarkerToken(scheduler, "queueRecover", start)(MeasurementUnit.time.milliseconds) def SCHEDULER_QUEUE_UPDATE(reason: String) = LogMarkerToken(scheduler, "queueUpdate", counter, None, Map("reason" -> reason))(MeasurementUnit.none) def SCHEDULER_QUEUE_WAITING_ACTIVATION(action: String) = diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala index 3c11916af..d87338dd3 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala @@ -55,6 +55,10 @@ case class CreateNewQueue(activationMessage: ActivationMessage, action: FullyQualifiedEntityName, actionMetadata: WhiskActionMetaData) +case class RecoverQueue(activationMessage: ActivationMessage, + action: FullyQualifiedEntityName, + actionMetadata: WhiskActionMetaData) + case class QueueManagerConfig(maxRetriesToGetQueue: Int, maxSchedulingTime: FiniteDuration) class QueueManager( @@ -80,7 +84,7 @@ class QueueManager( private val actorSelectionMap = TrieMap[String, ActorSelection]() - private val leaderElectionCallbacks = TrieMap[String, Either[EtcdFollower, EtcdLeader] => Unit]() + private val leaderElectionCallbacks = TrieMap[String, (Either[EtcdFollower, EtcdLeader], Boolean) => Unit]() private implicit val askTimeout = Timeout(5.seconds) private implicit val ec = context.dispatcher @@ -90,6 +94,8 @@ class QueueManager( // watch leaders and register them into actorSelectionMap watcherService ! WatchEndpoint(QueueKeys.queuePrefix, "", isPrefix = true, watcherName, Set(PutEvent, DeleteEvent)) + private var isShuttingDown = false + override def receive: Receive = { case request: CreateQueue if isWarmUpAction(request.fqn) => logging.info( @@ -114,12 +120,12 @@ class QueueManager( msg.leadership match { case Right(EtcdLeader(key, value, lease)) => leaderElectionCallbacks.remove(key).foreach { callback => - callback(Right(EtcdLeader(key, value, lease))) + callback(Right(EtcdLeader(key, value, lease)), isShuttingDown) } case Left(EtcdFollower(key, value)) => leaderElectionCallbacks.remove(key).foreach { callback => - callback(Left(EtcdFollower(key, value))) + callback(Left(EtcdFollower(key, value)), isShuttingDown) } } @@ -129,7 +135,11 @@ class QueueManager( s"Got activation message ${msg.activationId} for ${msg.user.namespace}/${msg.action} from remote queue manager.")( msg.transid) - handleActivationMessage(msg) + if (sender() == self) { + handleCycle(msg)(msg.transid) + } else { + handleActivationMessage(msg) + } case UpdateMemoryQueue(oldAction, newAction, msg) => logging.info( @@ -164,6 +174,25 @@ class QueueManager( updateInitRevisionMap(getLeaderKey(msg.user.namespace.name.asString, msg.action), msg.revision) queue ! msg msg.transid.mark(this, LoggingMarkers.SCHEDULER_QUEUE_CREATE) + if (isShuttingDown) { + queue ! GracefulShutdown + } + } + + case RecoverQueue(msg, action, actionMetaData) => + QueuePool.keys.find(_.docInfo.id == action.toDocId) match { + // a newer queue is created, send msg to new queue + case Some(key) if key.docInfo.rev >= msg.revision => + QueuePool.get(key) match { + case Some(queue) if queue.isLeader => + queue.queue ! msg.copy(revision = key.docInfo.rev) + logging.info(this, s"Queue for action $action is already recovered, skip")(msg.transid) + case _ => + recreateQueue(action, msg, actionMetaData) + } + case _ => + recreateQueue(action, msg, actionMetaData) + } // leaderKey is now optional, it becomes None when the stale queue is removed @@ -208,6 +237,7 @@ class QueueManager( } case GracefulShutdown => + isShuttingDown = true logging.info(this, s"Gracefully shutdown the queue manager") watcherService ! UnwatchEndpoint(QueueKeys.queuePrefix, isPrefix = true, watcherName) @@ -278,6 +308,62 @@ class QueueManager( initRevisionMap.update(key, revision) } + private def recreateQueue(action: FullyQualifiedEntityName, + msg: ActivationMessage, + actionMetaData: WhiskActionMetaData): Unit = { + logging.warn(this, s"recreate queue for ${msg.action}")(msg.transid) + val queue = createAndStartQueue(msg.user.namespace.name.asString, action, msg.revision, actionMetaData) + queue ! msg + msg.transid.mark(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER) + if (isShuttingDown) { + queue ! GracefulShutdown + } + } + + private def handleCycle(msg: ActivationMessage)(implicit transid: TransactionId): Unit = { + val action = msg.action + QueuePool.keys.find(_.docInfo.id == action.toDocId) match { + // a newer queue is created, send msg to new queue + case Some(key) if key.docInfo.rev >= msg.revision => + QueuePool.get(key) match { + case Some(queue) if queue.isLeader => + queue.queue ! msg.copy(revision = key.docInfo.rev) + logging.info(this, s"Queue for action $action is already recovered, skip")(msg.transid) + case _ => + recoverQueue(msg) + } + case _ => + recoverQueue(msg) + } + } + + private def recoverQueue(msg: ActivationMessage)(implicit transid: TransactionId): Unit = { + val start = transid.started(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER) + logging.info(this, s"Recover a queue for ${msg.action},") + getWhiskActionMetaData(entityStore, msg.action.toDocId, msg.revision, false) + .map { actionMetaData: WhiskActionMetaData => + actionMetaData.toExecutableWhiskAction match { + case Some(_) => + self ! RecoverQueue(msg, msg.action.copy(version = Some(actionMetaData.version)), actionMetaData) + transid.finished(this, start, s"recovering queue for ${msg.action.toDocId.asDocInfo(actionMetaData.rev)}") + + case None => + val message = + s"non-executable action: ${msg.action} with rev: ${msg.revision} reached queueManager" + completeErrorActivation(msg, message) + transid.failed(this, start, message) + } + } + .recover { + case t => + transid.failed( + this, + start, + s"failed to fetch action ${msg.action} with rev: ${msg.revision}, error ${t.getMessage}") + completeErrorActivation(msg, t.getMessage) + } + } + private def createNewQueue(newAction: FullyQualifiedEntityName, msg: ActivationMessage)( implicit transid: TransactionId): Future[Any] = { val start = transid.started(this, LoggingMarkers.SCHEDULER_QUEUE_UPDATE("version-mismatch")) @@ -453,24 +539,24 @@ class QueueManager( case None => dataManagementService ! ElectLeader(leaderKey, schedulerEndpoints.serialize, self) leaderElectionCallbacks.put( - leaderKey, { - case Right(EtcdLeader(_, _, _)) => - val queue = childFactory( - context, - request.invocationNamespace, - request.fqn, - request.revision, - request.whiskActionMetaData) - queue ! Start - QueuePool.put( - MemoryQueueKey(request.invocationNamespace, request.fqn.toDocId.asDocInfo(request.revision)), - MemoryQueueValue(queue, true)) - updateInitRevisionMap(leaderKey, request.revision) - receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true)) - - // in case of follower, do nothing - case Left(EtcdFollower(_, _)) => - receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true)) + leaderKey, + (electResult, isShuttingDown) => { + electResult match { + case Right(EtcdLeader(_, _, _)) => + val queue = createAndStartQueue( + request.invocationNamespace, + request.fqn, + request.revision, + request.whiskActionMetaData) + receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true)) + if (isShuttingDown) { + queue ! GracefulShutdown + } + + // in case of follower, do nothing + case Left(EtcdFollower(_, _)) => + receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true)) + } }) // there is already a leader election for leaderKey, so skip it @@ -490,6 +576,20 @@ class QueueManager( } } + private def createAndStartQueue(invocationNamespace: String, + action: FullyQualifiedEntityName, + revision: DocRevision, + actionMetaData: WhiskActionMetaData): ActorRef = { + val queue = + childFactory(context, invocationNamespace, action, revision, actionMetaData) + queue ! Start + QueuePool.put( + MemoryQueueKey(invocationNamespace, action.toDocId.asDocInfo(revision)), + MemoryQueueValue(queue, true)) + updateInitRevisionMap(getLeaderKey(invocationNamespace, action), revision) + queue + } + private val logScheduler = context.system.scheduler.scheduleAtFixedRate(0.seconds, 1.seconds)(() => { MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_QUEUE, QueuePool.countLeader()) }) diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala index 6ad1513f7..b60472e81 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala @@ -549,6 +549,65 @@ class QueueManagerTests probe.expectMsg(activationMessage.copy(action = finalFqn, revision = finalRevision)) } + it should "recreate the queue if it's removed by mistake while leader key is not removed from etcd" in { + val mockEtcdClient = mock[EtcdClient] + (mockEtcdClient + .get(_: String)) + .expects(*) + .returning(Future.successful { + RangeResponse + .newBuilder() + .addKvs(KeyValue.newBuilder().setKey("test").setValue(schedulerEndpoint.serialize).build()) + .build() + }) + .anyNumberOfTimes() + val dataManagementService = getTestDataManagementService() + val watcher = TestProbe() + + val probe = TestProbe() + + val childFactory = + (_: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData) => probe.ref + + val queueManager = + TestActorRef( + QueueManager + .props( + entityStore, + get, + mockEtcdClient, + schedulerEndpoint, + schedulerId, + dataManagementService.ref, + watcher.ref, + ack, + store, + childFactory, + mockConsumer)) + + watcher.expectMsg(watchEndpoint) + //current queue's revision is `1-test-revision` + (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse( + testInvocationNamespace, + testFQN, + true) + + probe.expectMsg(Start) + + // simulate queue superseded, the queue will be removed but leader key won't be deleted + queueManager ! QueueRemoved( + testInvocationNamespace, + testFQN.toDocId.asDocInfo(testDocRevision), + Some(testLeaderKey)) + + queueManager.!(activationMessage)(queueManager) + val msg2 = activationMessage.copy(activationId = ActivationId.generate()) + queueManager.!(msg2)(queueManager) // even send two requests, we should only recreate one queue + probe.expectMsg(Start) + probe.expectMsg(activationMessage) + probe.expectMsg(msg2) + } + it should "not skip outdated activation when the revision is older than the one in a datastore" in { stream.reset() val mockEtcdClient = mock[EtcdClient] @@ -1082,6 +1141,9 @@ class QueueManagerTests val probe = TestProbe() val fqn2 = FullyQualifiedEntityName(EntityPath("hello1"), EntityName("action1")) val fqn3 = FullyQualifiedEntityName(EntityPath("hello2"), EntityName("action2")) + val fqn4 = FullyQualifiedEntityName(EntityPath("hello3"), EntityName("action3")) + val fqn5 = FullyQualifiedEntityName(EntityPath("hello4"), EntityName("action4")) + val fqn6 = FullyQualifiedEntityName(EntityPath("hello5"), EntityName("action5")) // probe will watch all actors which are created by these factories val childFactory = @@ -1129,5 +1191,15 @@ class QueueManagerTests queueManager ! GracefulShutdown probe.expectMsgAllOf(10.seconds, GracefulShutdown, GracefulShutdown, GracefulShutdown) + + // after shutdown, it can still create/update/recover a queue, and new queue should be shutdown immediately too + (queueManager ? testQueueCreationMessage.copy(fqn = fqn4)) + .mapTo[CreateQueueResponse] + .futureValue shouldBe CreateQueueResponse(testInvocationNamespace, fqn = fqn4, success = true) + queueManager ! CreateNewQueue(activationMessage, fqn5, testActionMetaData) + queueManager ! RecoverQueue(activationMessage, fqn6, testActionMetaData) + + probe.expectMsgAllOf(10.seconds, GracefulShutdown, GracefulShutdown, GracefulShutdown) + } }