This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 034730e Refactor finisher and add unit tests (#2419) 034730e is described below commit 034730ed9b88d66db31121e4540ca6bf42a87e68 Author: rodric rabbah <rod...@gmail.com> AuthorDate: Mon Jun 26 12:27:21 2017 -0400 Refactor finisher and add unit tests (#2419) --- .../src/main/scala/whisk/common/Scheduler.scala | 11 +- .../core/controller/actions/PrimitiveActions.scala | 145 ++++++++++++----- .../actions/test/ActivationFinisherTests.scala | 171 +++++++++++++++++++++ 3 files changed, 283 insertions(+), 44 deletions(-) diff --git a/common/scala/src/main/scala/whisk/common/Scheduler.scala b/common/scala/src/main/scala/whisk/common/Scheduler.scala index b51f75a..e9be00a 100644 --- a/common/scala/src/main/scala/whisk/common/Scheduler.scala +++ b/common/scala/src/main/scala/whisk/common/Scheduler.scala @@ -53,10 +53,15 @@ object Scheduler { transid: TransactionId) extends Actor { implicit val ec = context.dispatcher - var lastSchedule: Option[Cancellable] = { - Some(context.system.scheduler.scheduleOnce(initialDelay, self, ScheduledWork)) - } + var lastSchedule: Option[Cancellable] = None + override def preStart() = { + if (initialDelay != Duration.Zero) { + lastSchedule = Some(context.system.scheduler.scheduleOnce(initialDelay, self, ScheduledWork)) + } else { + self ! ScheduledWork + } + } override def postStop() = { logging.info(this, s"$name shutdown") lastSchedule.foreach(_.cancel()) diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala index 0a9034b..0b39583 100644 --- a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala +++ b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala @@ -17,6 +17,7 @@ package whisk.core.controller.actions +import scala.collection.mutable.Buffer import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.Promise @@ -24,11 +25,11 @@ import scala.concurrent.duration._ import scala.util.Failure import akka.actor.Actor +import akka.actor.ActorRef import akka.actor.ActorSystem +import akka.actor.Cancellable import akka.actor.Props - import spray.json._ - import whisk.common.Logging import whisk.common.LoggingMarkers import whisk.common.Scheduler @@ -40,8 +41,6 @@ import whisk.core.entity._ import whisk.core.entity.types.ActivationStore import whisk.core.entity.types.EntityStore import whisk.utils.ExecutionContextFactory.FutureExtensions -import scala.collection.mutable.Buffer -import akka.actor.Cancellable protected[actions] trait PrimitiveActions { /** The core collections require backend services to be injected in this trait. */ @@ -156,15 +155,17 @@ protected[actions] trait PrimitiveActions { // 2. failing active ack (due to active ack timeout), fall over to db polling // 3. timeout on db polling => converts activation to non-blocking (returns activation id only) // 4. internal error message - val promise = Promise[Either[ActivationId, WhiskActivation]] // this is strictly completed by the finishing actor - val finisher = actorSystem.actorOf(Props(new ActivationFinisher(user, activationId, promise))) + val docid = DocId(WhiskEntity.qualifiedName(user.namespace.toPath, activationId)) + val (promise, finisher) = ActivationFinisher.props({ + () => WhiskActivation.get(activationStore, docid) + }) logging.info(this, s"action activation will block for result upto $totalWaitTime") activeAckResponse map { case result @ Right(_) => // activation complete, result is available - finisher ! Finish(result) + finisher ! ActivationFinisher.Finish(result) case _ => // active ack received but it does not carry the response, @@ -176,67 +177,98 @@ protected[actions] trait PrimitiveActions { // return the promise which is either fulfilled by active ack, polling from the database, // or the timeout alternative when the allowed duration expires (i.e., the action took // longer than the permitted, per totalWaitTime). - promise.future.withAlternativeAfterTimeout(totalWaitTime, { + promise.withAlternativeAfterTimeout(totalWaitTime, { Future.successful(Left(activationId)).andThen { // result no longer interesting; terminate the finisher/shut down db polling if necessary case _ => actorSystem.stop(finisher) } }) } +} + +/** Companion to the ActivationFinisher. */ +protected[actions] object ActivationFinisher { + case class Finish(activation: Right[ActivationId, WhiskActivation]) + + private type ActivationLookup = () => Future[WhiskActivation] /** Periodically polls the db to cover missing active acks. */ - val datastorePollPeriodForActivation = 15.seconds - val datastorePreemptivePolling = Seq(1.second, 3.seconds, 5.seconds, 7.seconds) + private val datastorePollPeriodForActivation = 15.seconds + + /** + * In case of a partial active ack where it is know an activation completed + * but the result could not be sent over the bus, use this periodicity to poll + * for a result. + */ + private val datastorePreemptivePolling = Seq(1.second, 3.seconds, 5.seconds, 7.seconds) - protected case class Finish(activation: Right[ActivationId, WhiskActivation]) + def props(activationLookup: ActivationLookup)( + implicit transid: TransactionId, + actorSystem: ActorSystem, + executionContext: ExecutionContext, + logging: Logging): (Future[Either[ActivationId, WhiskActivation]], ActorRef) = { - protected class ActivationFinisher( - user: Identity, - activationId: ActivationId, + val (p, _, f) = props(activationLookup, datastorePollPeriodForActivation, datastorePreemptivePolling) + (p.future, f) // hides the polling actor + } + + /** + * Creates the finishing actor. + * This is factored for testing. + */ + protected[actions] def props( + activationLookup: ActivationLookup, + slowPoll: FiniteDuration, + fastPolls: Seq[FiniteDuration])( + implicit transid: TransactionId, + actorSystem: ActorSystem, + executionContext: ExecutionContext, + logging: Logging): (Promise[Either[ActivationId, WhiskActivation]], ActorRef, ActorRef) = { + + // this is strictly completed by the finishing actor + val promise = Promise[Either[ActivationId, WhiskActivation]] + val dbpoller = poller(slowPoll, promise, activationLookup) + val finisher = Props(new ActivationFinisher(dbpoller, fastPolls, promise)) + + (promise, dbpoller, actorSystem.actorOf(finisher)) + } + + /** + * An actor to complete a blocking activation request. It encapsulates a promise + * to be completed when the result is ready. This may happen in one of two ways. + * An active ack message is relayed to this actor to complete the promise when + * the active ack is received. Or in case of a partial/missing active ack, an + * explicitly scheduled datastore poll of the activation record, if found, will + * complete the transaction. When the promise is fulfilled, the actor self destructs. + */ + private class ActivationFinisher( + poller: ActorRef, // the activation poller + fastPollPeriods: Seq[FiniteDuration], promise: Promise[Either[ActivationId, WhiskActivation]])( - implicit transid: TransactionId) extends Actor { + implicit transid: TransactionId, + actorSystem: ActorSystem, + executionContext: ExecutionContext, + logging: Logging) extends Actor { // when the future completes, self-destruct promise.future.andThen { case _ => shutdown() } - val docid = DocId(WhiskEntity.qualifiedName(user.namespace.toPath, activationId)) val preemptiveMsgs: Buffer[Cancellable] = Buffer.empty - val poller = { - Scheduler.scheduleWaitAtMost( - datastorePollPeriodForActivation, - initialDelay = datastorePollPeriodForActivation, - name = "dbpoll")(() => { - if (!promise.isCompleted) { - WhiskActivation.get(activationStore, docid) map { - // complete the future, which in turn will poison pill this scheduler - activation => - promise.trySuccess(Right(activation.withoutLogs)) // Logs always not provided on blocking call - } andThen { - case Failure(e: NoDocumentException) => // do nothing, scheduler will reschedule another poll - case Failure(t: Throwable) => // something went wrong, abort - logging.error(this, s"failed while waiting on result: ${t.getMessage}") - promise.tryFailure(t) // complete the future, which in turn will poison pill this scheduler - } - } else Future.successful({}) // the scheduler will be halted because the promise is now resolved - }) - } - def receive = { - case Finish(activation) => + case ActivationFinisher.Finish(activation) => promise.trySuccess(activation) case msg @ Scheduler.WorkOnceNow => // try up to three times when pre-emptying the schedule - datastorePreemptivePolling.foreach { + fastPollPeriods.foreach { s => preemptiveMsgs += context.system.scheduler.scheduleOnce(s, poller, msg) } - - case msg => - poller ! msg } def shutdown(): Unit = { + preemptiveMsgs.foreach(_.cancel()) + preemptiveMsgs.clear() context.stop(poller) context.stop(self) } @@ -244,7 +276,38 @@ protected[actions] trait PrimitiveActions { override def postStop() = { logging.info(this, "finisher shutdown") preemptiveMsgs.foreach(_.cancel()) + preemptiveMsgs.clear() context.stop(poller) } } + + /** + * This creates the inner datastore poller for the completed activation. + * It is a factory method to facilitate testing. + */ + private def poller( + slowPollPeriod: FiniteDuration, + promise: Promise[Either[ActivationId, WhiskActivation]], + activationLookup: ActivationLookup)( + implicit transid: TransactionId, + actorSystem: ActorSystem, + executionContext: ExecutionContext, + logging: Logging): ActorRef = { + Scheduler.scheduleWaitAtMost( + slowPollPeriod, + initialDelay = slowPollPeriod, + name = "dbpoll")(() => { + if (!promise.isCompleted) { + activationLookup() map { + // complete the future, which in turn will poison pill this scheduler + activation => promise.trySuccess(Right(activation.withoutLogs)) // logs excluded on blocking calls + } andThen { + case Failure(e: NoDocumentException) => // do nothing, scheduler will reschedule another poll + case Failure(t: Throwable) => // something went wrong, abort + logging.error(this, s"failed while waiting on result: ${t.getMessage}") + promise.tryFailure(t) // complete the future, which in turn will poison pill this scheduler + } + } else Future.successful({}) // the scheduler will be halted because the promise is now resolved + }) + } } diff --git a/tests/src/test/scala/whisk/core/controller/actions/test/ActivationFinisherTests.scala b/tests/src/test/scala/whisk/core/controller/actions/test/ActivationFinisherTests.scala new file mode 100644 index 0000000..a6fd017 --- /dev/null +++ b/tests/src/test/scala/whisk/core/controller/actions/test/ActivationFinisherTests.scala @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.controller.actions.test + +import java.time.Instant + +import scala.concurrent.Future +import scala.concurrent.duration.DurationInt + +import org.junit.runner.RunWith +import org.scalatest.BeforeAndAfterEach +import org.scalatest.FlatSpec +import org.scalatest.Matchers +import org.scalatest.junit.JUnitRunner + +import common.StreamLogging +import common.WskActorSystem +import spray.json._ +import whisk.common.TransactionId +import whisk.core.controller.actions.ActivationFinisher +import whisk.core.entity._ +import whisk.core.entity.ActivationResponse +import whisk.core.entity.size.SizeInt +import whisk.core.database.NoDocumentException +import akka.testkit.TestProbe +import whisk.common.Scheduler +import akka.actor.PoisonPill + +@RunWith(classOf[JUnitRunner]) +class ActivationFinisherTests + extends FlatSpec + with BeforeAndAfterEach + with Matchers + with WskActorSystem + with StreamLogging { + + implicit val tid = TransactionId.testing + + val activation = WhiskActivation( + namespace = EntityPath("ns"), + name = EntityName("a"), + Subject(), + activationId = ActivationId(), + start = Instant.now(), + end = Instant.now(), + response = ActivationResponse.success(Some(JsObject("res" -> JsNumber(2)))), + annotations = Parameters("limits", ActionLimits( + TimeLimit(1.second), + MemoryLimit(128.MB), + LogLimit(1.MB)).toJson), + duration = Some(123)) + + var activationLookupCounter = 0 + @volatile var activationResult: Option[Throwable] = None + + def activationLookup(): Future[WhiskActivation] = { + activationLookupCounter += 1 + activationResult.map(Future.failed(_)).getOrElse(Future.successful(activation)) + } + + override def beforeEach() = { + activationLookupCounter = 0 + activationResult = None + } + + behavior of "activation finisher" + override lazy val printstream = Console.out + val slowPoll = 200.milliseconds + val fastPoll = Seq() + + it should "poll until promise is completed" in { + activationResult = Some(NoDocumentException("")) + val (promise, poller, finisher) = ActivationFinisher.props(activationLookup, slowPoll, fastPoll) + + val testProbePoller = TestProbe() + val testProbeFinisher = TestProbe() + testProbePoller.watch(poller) + testProbeFinisher.watch(finisher) + + val slowPollWorkWindow = (slowPoll * 3) + (slowPoll / 2) + Thread.sleep(slowPollWorkWindow.toMillis) + activationLookupCounter should (be >= 2 and be <= 3) + + // should terminate the parent finisher and child poller on completion + promise.trySuccess(Right(activation)) + + testProbePoller.expectTerminated(poller, 1.second) + testProbeFinisher.expectTerminated(finisher, 1.second) + } + + it should "complete promise from poller" in { + val (promise, poller, finisher) = ActivationFinisher.props(activationLookup, slowPoll, fastPoll) + + val testProbePoller = TestProbe() + val testProbeFinisher = TestProbe() + testProbePoller.watch(poller) + testProbeFinisher.watch(finisher) + + val slowPollWorkWindow = (slowPoll * 2) + (slowPoll / 1) + Thread.sleep(slowPollWorkWindow.toMillis) + activationLookupCounter should be(1) + + testProbePoller.expectTerminated(poller, 1.second) + testProbeFinisher.expectTerminated(finisher, 1.second) + + promise shouldBe 'completed + } + + it should "finish when receiving corresponding message" in { + activationResult = Some(NoDocumentException("")) + val (promise, poller, finisher) = ActivationFinisher.props(activationLookup, slowPoll, fastPoll) + + val testProbePoller = TestProbe() + val testProbeFinisher = TestProbe() + testProbePoller.watch(poller) + testProbeFinisher.watch(finisher) + + val slowPollWorkWindow = (slowPoll * 2) + (slowPoll / 1) + Thread.sleep(slowPollWorkWindow.toMillis) + activationLookupCounter should (be >= 1 and be <= 2) + + // should terminate the parent finisher and child poller once message is received + finisher ! ActivationFinisher.Finish(Right(activation)) + + testProbePoller.expectTerminated(poller, 1.second) + testProbeFinisher.expectTerminated(finisher, 1.second) + + promise shouldBe 'completed + } + + it should "poll pre-emptively" in { + activationResult = Some(NoDocumentException("")) + val slowPoll = 600.milliseconds + val fastPoll = Seq(100.milliseconds, 200.milliseconds) + val (promise, poller, finisher) = ActivationFinisher.props(activationLookup, slowPoll, fastPoll) + + val testProbePoller = TestProbe() + val testProbeFinisher = TestProbe() + testProbePoller.watch(poller) + testProbeFinisher.watch(finisher) + + Thread.sleep(500.milliseconds.toMillis) + activationLookupCounter should be(0) + + // should cause polls + finisher ! Scheduler.WorkOnceNow + Thread.sleep(500.milliseconds.toMillis) + activationLookupCounter should be(3) + + finisher ! PoisonPill + + testProbePoller.expectTerminated(poller, 1.second) + testProbeFinisher.expectTerminated(finisher, 1.second) + } + +} -- To stop receiving notification emails like this one, please contact ['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].