This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 034730e  Refactor finisher and add unit tests (#2419)
034730e is described below

commit 034730ed9b88d66db31121e4540ca6bf42a87e68
Author: rodric rabbah <rod...@gmail.com>
AuthorDate: Mon Jun 26 12:27:21 2017 -0400

    Refactor finisher and add unit tests (#2419)
---
 .../src/main/scala/whisk/common/Scheduler.scala    |  11 +-
 .../core/controller/actions/PrimitiveActions.scala | 145 ++++++++++++-----
 .../actions/test/ActivationFinisherTests.scala     | 171 +++++++++++++++++++++
 3 files changed, 283 insertions(+), 44 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/common/Scheduler.scala 
b/common/scala/src/main/scala/whisk/common/Scheduler.scala
index b51f75a..e9be00a 100644
--- a/common/scala/src/main/scala/whisk/common/Scheduler.scala
+++ b/common/scala/src/main/scala/whisk/common/Scheduler.scala
@@ -53,10 +53,15 @@ object Scheduler {
             transid: TransactionId) extends Actor {
         implicit val ec = context.dispatcher
 
-        var lastSchedule: Option[Cancellable] = {
-            Some(context.system.scheduler.scheduleOnce(initialDelay, self, 
ScheduledWork))
-        }
+        var lastSchedule: Option[Cancellable] = None
 
+        override def preStart() = {
+            if (initialDelay != Duration.Zero) {
+                lastSchedule = 
Some(context.system.scheduler.scheduleOnce(initialDelay, self, ScheduledWork))
+            } else {
+                self ! ScheduledWork
+            }
+        }
         override def postStop() = {
             logging.info(this, s"$name shutdown")
             lastSchedule.foreach(_.cancel())
diff --git 
a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
 
b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
index 0a9034b..0b39583 100644
--- 
a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
+++ 
b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
@@ -17,6 +17,7 @@
 
 package whisk.core.controller.actions
 
+import scala.collection.mutable.Buffer
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.Promise
@@ -24,11 +25,11 @@ import scala.concurrent.duration._
 import scala.util.Failure
 
 import akka.actor.Actor
+import akka.actor.ActorRef
 import akka.actor.ActorSystem
+import akka.actor.Cancellable
 import akka.actor.Props
-
 import spray.json._
-
 import whisk.common.Logging
 import whisk.common.LoggingMarkers
 import whisk.common.Scheduler
@@ -40,8 +41,6 @@ import whisk.core.entity._
 import whisk.core.entity.types.ActivationStore
 import whisk.core.entity.types.EntityStore
 import whisk.utils.ExecutionContextFactory.FutureExtensions
-import scala.collection.mutable.Buffer
-import akka.actor.Cancellable
 
 protected[actions] trait PrimitiveActions {
     /** The core collections require backend services to be injected in this 
trait. */
@@ -156,15 +155,17 @@ protected[actions] trait PrimitiveActions {
         // 2. failing active ack (due to active ack timeout), fall over to db 
polling
         // 3. timeout on db polling => converts activation to non-blocking 
(returns activation id only)
         // 4. internal error message
-        val promise = Promise[Either[ActivationId, WhiskActivation]] // this 
is strictly completed by the finishing actor
-        val finisher = actorSystem.actorOf(Props(new ActivationFinisher(user, 
activationId, promise)))
+        val docid = DocId(WhiskEntity.qualifiedName(user.namespace.toPath, 
activationId))
+        val (promise, finisher) = ActivationFinisher.props({
+            () => WhiskActivation.get(activationStore, docid)
+        })
 
         logging.info(this, s"action activation will block for result upto 
$totalWaitTime")
 
         activeAckResponse map {
             case result @ Right(_) =>
                 // activation complete, result is available
-                finisher ! Finish(result)
+                finisher ! ActivationFinisher.Finish(result)
 
             case _ =>
                 // active ack received but it does not carry the response,
@@ -176,67 +177,98 @@ protected[actions] trait PrimitiveActions {
         // return the promise which is either fulfilled by active ack, polling 
from the database,
         // or the timeout alternative when the allowed duration expires (i.e., 
the action took
         // longer than the permitted, per totalWaitTime).
-        promise.future.withAlternativeAfterTimeout(totalWaitTime, {
+        promise.withAlternativeAfterTimeout(totalWaitTime, {
             Future.successful(Left(activationId)).andThen {
                 // result no longer interesting; terminate the finisher/shut 
down db polling if necessary
                 case _ => actorSystem.stop(finisher)
             }
         })
     }
+}
+
+/** Companion to the ActivationFinisher. */
+protected[actions] object ActivationFinisher {
+    case class Finish(activation: Right[ActivationId, WhiskActivation])
+
+    private type ActivationLookup = () => Future[WhiskActivation]
 
     /** Periodically polls the db to cover missing active acks. */
-    val datastorePollPeriodForActivation = 15.seconds
-    val datastorePreemptivePolling = Seq(1.second, 3.seconds, 5.seconds, 
7.seconds)
+    private val datastorePollPeriodForActivation = 15.seconds
+
+    /**
+     * In case of a partial active ack where it is know an activation completed
+     * but the result could not be sent over the bus, use this periodicity to 
poll
+     * for a result.
+     */
+    private val datastorePreemptivePolling = Seq(1.second, 3.seconds, 
5.seconds, 7.seconds)
 
-    protected case class Finish(activation: Right[ActivationId, 
WhiskActivation])
+    def props(activationLookup: ActivationLookup)(
+        implicit transid: TransactionId,
+        actorSystem: ActorSystem,
+        executionContext: ExecutionContext,
+        logging: Logging): (Future[Either[ActivationId, WhiskActivation]], 
ActorRef) = {
 
-    protected class ActivationFinisher(
-        user: Identity,
-        activationId: ActivationId,
+        val (p, _, f) = props(activationLookup, 
datastorePollPeriodForActivation, datastorePreemptivePolling)
+        (p.future, f) // hides the polling actor
+    }
+
+    /**
+     * Creates the finishing actor.
+     * This is factored for testing.
+     */
+    protected[actions] def props(
+        activationLookup: ActivationLookup,
+        slowPoll: FiniteDuration,
+        fastPolls: Seq[FiniteDuration])(
+            implicit transid: TransactionId,
+            actorSystem: ActorSystem,
+            executionContext: ExecutionContext,
+            logging: Logging): (Promise[Either[ActivationId, 
WhiskActivation]], ActorRef, ActorRef) = {
+
+        // this is strictly completed by the finishing actor
+        val promise = Promise[Either[ActivationId, WhiskActivation]]
+        val dbpoller = poller(slowPoll, promise, activationLookup)
+        val finisher = Props(new ActivationFinisher(dbpoller, fastPolls, 
promise))
+
+        (promise, dbpoller, actorSystem.actorOf(finisher))
+    }
+
+    /**
+     * An actor to complete a blocking activation request. It encapsulates a 
promise
+     * to be completed when the result is ready. This may happen in one of two 
ways.
+     * An active ack message is relayed to this actor to complete the promise 
when
+     * the active ack is received. Or in case of a partial/missing active ack, 
an
+     * explicitly scheduled datastore poll of the activation record, if found, 
will
+     * complete the transaction. When the promise is fulfilled, the actor self 
destructs.
+     */
+    private class ActivationFinisher(
+        poller: ActorRef, // the activation poller
+        fastPollPeriods: Seq[FiniteDuration],
         promise: Promise[Either[ActivationId, WhiskActivation]])(
-            implicit transid: TransactionId) extends Actor {
+            implicit transid: TransactionId,
+            actorSystem: ActorSystem,
+            executionContext: ExecutionContext,
+            logging: Logging) extends Actor {
 
         // when the future completes, self-destruct
         promise.future.andThen { case _ => shutdown() }
 
-        val docid = DocId(WhiskEntity.qualifiedName(user.namespace.toPath, 
activationId))
         val preemptiveMsgs: Buffer[Cancellable] = Buffer.empty
 
-        val poller = {
-            Scheduler.scheduleWaitAtMost(
-                datastorePollPeriodForActivation,
-                initialDelay = datastorePollPeriodForActivation,
-                name = "dbpoll")(() => {
-                    if (!promise.isCompleted) {
-                        WhiskActivation.get(activationStore, docid) map {
-                            // complete the future, which in turn will poison 
pill this scheduler
-                            activation =>
-                                
promise.trySuccess(Right(activation.withoutLogs)) // Logs always not provided 
on blocking call
-                        } andThen {
-                            case Failure(e: NoDocumentException) => // do 
nothing, scheduler will reschedule another poll
-                            case Failure(t: Throwable) => // something went 
wrong, abort
-                                logging.error(this, s"failed while waiting on 
result: ${t.getMessage}")
-                                promise.tryFailure(t) // complete the future, 
which in turn will poison pill this scheduler
-                        }
-                    } else Future.successful({}) // the scheduler will be 
halted because the promise is now resolved
-                })
-        }
-
         def receive = {
-            case Finish(activation) =>
+            case ActivationFinisher.Finish(activation) =>
                 promise.trySuccess(activation)
 
             case msg @ Scheduler.WorkOnceNow =>
                 // try up to three times when pre-emptying the schedule
-                datastorePreemptivePolling.foreach {
+                fastPollPeriods.foreach {
                     s => preemptiveMsgs += 
context.system.scheduler.scheduleOnce(s, poller, msg)
                 }
-
-            case msg =>
-                poller ! msg
         }
 
         def shutdown(): Unit = {
+            preemptiveMsgs.foreach(_.cancel())
+            preemptiveMsgs.clear()
             context.stop(poller)
             context.stop(self)
         }
@@ -244,7 +276,38 @@ protected[actions] trait PrimitiveActions {
         override def postStop() = {
             logging.info(this, "finisher shutdown")
             preemptiveMsgs.foreach(_.cancel())
+            preemptiveMsgs.clear()
             context.stop(poller)
         }
     }
+
+    /**
+     * This creates the inner datastore poller for the completed activation.
+     * It is a factory method to facilitate testing.
+     */
+    private def poller(
+        slowPollPeriod: FiniteDuration,
+        promise: Promise[Either[ActivationId, WhiskActivation]],
+        activationLookup: ActivationLookup)(
+            implicit transid: TransactionId,
+            actorSystem: ActorSystem,
+            executionContext: ExecutionContext,
+            logging: Logging): ActorRef = {
+        Scheduler.scheduleWaitAtMost(
+            slowPollPeriod,
+            initialDelay = slowPollPeriod,
+            name = "dbpoll")(() => {
+                if (!promise.isCompleted) {
+                    activationLookup() map {
+                        // complete the future, which in turn will poison pill 
this scheduler
+                        activation => 
promise.trySuccess(Right(activation.withoutLogs)) // logs excluded on blocking 
calls
+                    } andThen {
+                        case Failure(e: NoDocumentException) => // do nothing, 
scheduler will reschedule another poll
+                        case Failure(t: Throwable) => // something went wrong, 
abort
+                            logging.error(this, s"failed while waiting on 
result: ${t.getMessage}")
+                            promise.tryFailure(t) // complete the future, 
which in turn will poison pill this scheduler
+                    }
+                } else Future.successful({}) // the scheduler will be halted 
because the promise is now resolved
+            })
+    }
 }
diff --git 
a/tests/src/test/scala/whisk/core/controller/actions/test/ActivationFinisherTests.scala
 
b/tests/src/test/scala/whisk/core/controller/actions/test/ActivationFinisherTests.scala
new file mode 100644
index 0000000..a6fd017
--- /dev/null
+++ 
b/tests/src/test/scala/whisk/core/controller/actions/test/ActivationFinisherTests.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.controller.actions.test
+
+import java.time.Instant
+
+import scala.concurrent.Future
+import scala.concurrent.duration.DurationInt
+
+import org.junit.runner.RunWith
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import org.scalatest.junit.JUnitRunner
+
+import common.StreamLogging
+import common.WskActorSystem
+import spray.json._
+import whisk.common.TransactionId
+import whisk.core.controller.actions.ActivationFinisher
+import whisk.core.entity._
+import whisk.core.entity.ActivationResponse
+import whisk.core.entity.size.SizeInt
+import whisk.core.database.NoDocumentException
+import akka.testkit.TestProbe
+import whisk.common.Scheduler
+import akka.actor.PoisonPill
+
+@RunWith(classOf[JUnitRunner])
+class ActivationFinisherTests
+    extends FlatSpec
+    with BeforeAndAfterEach
+    with Matchers
+    with WskActorSystem
+    with StreamLogging {
+
+    implicit val tid = TransactionId.testing
+
+    val activation = WhiskActivation(
+        namespace = EntityPath("ns"),
+        name = EntityName("a"),
+        Subject(),
+        activationId = ActivationId(),
+        start = Instant.now(),
+        end = Instant.now(),
+        response = ActivationResponse.success(Some(JsObject("res" -> 
JsNumber(2)))),
+        annotations = Parameters("limits", ActionLimits(
+            TimeLimit(1.second),
+            MemoryLimit(128.MB),
+            LogLimit(1.MB)).toJson),
+        duration = Some(123))
+
+    var activationLookupCounter = 0
+    @volatile var activationResult: Option[Throwable] = None
+
+    def activationLookup(): Future[WhiskActivation] = {
+        activationLookupCounter += 1
+        
activationResult.map(Future.failed(_)).getOrElse(Future.successful(activation))
+    }
+
+    override def beforeEach() = {
+        activationLookupCounter = 0
+        activationResult = None
+    }
+
+    behavior of "activation finisher"
+    override lazy val printstream = Console.out
+    val slowPoll = 200.milliseconds
+    val fastPoll = Seq()
+
+    it should "poll until promise is completed" in {
+        activationResult = Some(NoDocumentException(""))
+        val (promise, poller, finisher) = 
ActivationFinisher.props(activationLookup, slowPoll, fastPoll)
+
+        val testProbePoller = TestProbe()
+        val testProbeFinisher = TestProbe()
+        testProbePoller.watch(poller)
+        testProbeFinisher.watch(finisher)
+
+        val slowPollWorkWindow = (slowPoll * 3) + (slowPoll / 2)
+        Thread.sleep(slowPollWorkWindow.toMillis)
+        activationLookupCounter should (be >= 2 and be <= 3)
+
+        // should terminate the parent finisher and child poller on completion
+        promise.trySuccess(Right(activation))
+
+        testProbePoller.expectTerminated(poller, 1.second)
+        testProbeFinisher.expectTerminated(finisher, 1.second)
+    }
+
+    it should "complete promise from poller" in {
+        val (promise, poller, finisher) = 
ActivationFinisher.props(activationLookup, slowPoll, fastPoll)
+
+        val testProbePoller = TestProbe()
+        val testProbeFinisher = TestProbe()
+        testProbePoller.watch(poller)
+        testProbeFinisher.watch(finisher)
+
+        val slowPollWorkWindow = (slowPoll * 2) + (slowPoll / 1)
+        Thread.sleep(slowPollWorkWindow.toMillis)
+        activationLookupCounter should be(1)
+
+        testProbePoller.expectTerminated(poller, 1.second)
+        testProbeFinisher.expectTerminated(finisher, 1.second)
+
+        promise shouldBe 'completed
+    }
+
+    it should "finish when receiving corresponding message" in {
+        activationResult = Some(NoDocumentException(""))
+        val (promise, poller, finisher) = 
ActivationFinisher.props(activationLookup, slowPoll, fastPoll)
+
+        val testProbePoller = TestProbe()
+        val testProbeFinisher = TestProbe()
+        testProbePoller.watch(poller)
+        testProbeFinisher.watch(finisher)
+
+        val slowPollWorkWindow = (slowPoll * 2) + (slowPoll / 1)
+        Thread.sleep(slowPollWorkWindow.toMillis)
+        activationLookupCounter should (be >= 1 and be <= 2)
+
+        // should terminate the parent finisher and child poller once message 
is received
+        finisher ! ActivationFinisher.Finish(Right(activation))
+
+        testProbePoller.expectTerminated(poller, 1.second)
+        testProbeFinisher.expectTerminated(finisher, 1.second)
+
+        promise shouldBe 'completed
+    }
+
+    it should "poll pre-emptively" in {
+        activationResult = Some(NoDocumentException(""))
+        val slowPoll = 600.milliseconds
+        val fastPoll = Seq(100.milliseconds, 200.milliseconds)
+        val (promise, poller, finisher) = 
ActivationFinisher.props(activationLookup, slowPoll, fastPoll)
+
+        val testProbePoller = TestProbe()
+        val testProbeFinisher = TestProbe()
+        testProbePoller.watch(poller)
+        testProbeFinisher.watch(finisher)
+
+        Thread.sleep(500.milliseconds.toMillis)
+        activationLookupCounter should be(0)
+
+        // should cause polls
+        finisher ! Scheduler.WorkOnceNow
+        Thread.sleep(500.milliseconds.toMillis)
+        activationLookupCounter should be(3)
+
+        finisher ! PoisonPill
+
+        testProbePoller.expectTerminated(poller, 1.second)
+        testProbeFinisher.expectTerminated(finisher, 1.second)
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].

Reply via email to