This is an automated email from the ASF dual-hosted git repository.

dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 94d5f3f  Separate container removal from job completion. (#3132)
94d5f3f is described below

commit 94d5f3f2a631ebf53d7718f8af8d6a20163fb89b
Author: rodric rabbah <rod...@gmail.com>
AuthorDate: Wed Jan 3 08:38:06 2018 -0500

    Separate container removal from job completion. (#3132)
    
    The current handshake between the container proxy and the container
    pool conflates container removal and job completion. The former does
    not necessarily indicate there is free capacity in the pool since
    there are conditions under which the job is rescheduled (e.g., resume
    failure, container aging, container is reclaimed).
---
 .../whisk/core/containerpool/ContainerPool.scala     | 17 +++++++++++++++++
 .../whisk/core/containerpool/ContainerProxy.scala    | 20 +++++++++++++++-----
 .../core/containerpool/test/ContainerPoolTests.scala | 14 ++++++++++++++
 .../containerpool/test/ContainerProxyTests.scala     |  6 +++---
 4 files changed, 49 insertions(+), 8 deletions(-)

diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala 
b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
index 1fa6b3a..61172dd 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
@@ -81,6 +81,10 @@ class ContainerPool(childFactory: ActorRefFactory => 
ActorRef,
 
   def receive: Receive = {
     // A job to run on a container
+    //
+    // Run messages are received either via the feed or from child containers 
which cannot process
+    // their requests and send them back to the pool for rescheduling (this 
may happen if "docker" operations
+    // fail for example, or a container has aged and was destroying itself 
when a new request was assigned)
     case r: Run =>
       val container = if (busyPool.size < maxActiveContainers) {
         // Schedule a job to a warm container
@@ -110,6 +114,9 @@ class ContainerPool(childFactory: ActorRefFactory => 
ActorRef,
           freePool = freePool - actor
           actor ! r // forwards the run request to the container
         case None =>
+          // this can also happen if createContainer fails to start a new 
container, or
+          // if a job is rescheduled but the container it was allocated to has 
not yet destroyed itself
+          // (and a new container would over commit the pool)
           logging.error(this, "Rescheduling Run message, too many message in 
the pool")(r.msg.transid)
           self ! r
       }
@@ -131,8 +138,18 @@ class ContainerPool(childFactory: ActorRefFactory => 
ActorRef,
       freePool = freePool - sender()
       busyPool.get(sender()).foreach { _ =>
         busyPool = busyPool - sender()
+        // container was busy, so there is capacity to accept another job 
request
         feed ! MessageFeed.Processed
       }
+
+    // This message is received for one of these reasons:
+    // 1. Container errored while resuming a warm container, could not process 
the job, and sent the job back
+    // 2. The container aged, is destroying itself, and was assigned a job 
which it had to send back
+    // 3. The container aged and is destroying itself
+    // Update the free/busy lists but no message is sent to the feed since 
there is no change in capacity yet
+    case RescheduleJob =>
+      freePool = freePool - sender()
+      busyPool = busyPool - sender()
   }
 
   /** Creates a new container and updates state accordingly. */
diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala 
b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index dd4336e..c3f89c5 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -67,7 +67,8 @@ case object Remove
 // Events sent by the actor
 case class NeedWork(data: ContainerData)
 case object ContainerPaused
-case object ContainerRemoved
+case object ContainerRemoved // when container is destroyed
+case object RescheduleJob // job is sent back to parent and could not be 
processed because container is being destroyed
 
 /**
  * A proxy that wraps a Container. It is used to keep track of the lifecycle
@@ -101,6 +102,7 @@ class ContainerProxy(
     with Stash {
   implicit val ec = context.system.dispatcher
   implicit val logging = new AkkaLogging(context.system.log)
+  var rescheduleJob = false // true iff actor receives a job but cannot 
process it because actor will destroy itself
 
   startWith(Uninitialized, NoData())
 
@@ -248,7 +250,9 @@ class ContainerProxy(
           // Sending the message to self on a failure will cause the message
           // to ultimately be sent back to the parent (which will retry it)
           // when container removal is done.
-          case Failure(_) => self ! job
+          case Failure(_) =>
+            rescheduleJob = true
+            self ! job
         }
         .flatMap(_ => initializeAndRun(data.container, job))
         .map(_ => WarmedData(data.container, job.msg.user.namespace, 
job.action, Instant.now))
@@ -256,8 +260,10 @@ class ContainerProxy(
 
       goto(Running)
 
-    // timeout or removing
-    case Event(StateTimeout | Remove, data: WarmedData) => 
destroyContainer(data.container)
+    // container is reclaimed by the pool or it has become too old
+    case Event(StateTimeout | Remove, data: WarmedData) =>
+      rescheduleJob = true // to supress sending message to the pool and not 
double count
+      destroyContainer(data.container)
   }
 
   when(Removing) {
@@ -292,7 +298,11 @@ class ContainerProxy(
    * @param container the container to destroy
    */
   def destroyContainer(container: Container) = {
-    context.parent ! ContainerRemoved
+    if (!rescheduleJob) {
+      context.parent ! ContainerRemoved
+    } else {
+      context.parent ! RescheduleJob
+    }
 
     val unpause = stateName match {
       case Paused => container.resume()(TransactionId.invokerNanny)
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala 
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
index dcfe5e4..0b53103 100644
--- 
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
+++ 
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
@@ -217,6 +217,20 @@ class ContainerPoolTests
     containers(1).expectMsg(runMessageDifferentNamespace)
   }
 
+  it should "reschedule job when container is removed prematurely without 
sending message to feed" in within(timeout) {
+    val (containers, factory) = testContainers(2)
+    val feed = TestProbe()
+
+    // a pool with only 1 slot
+    val pool = system.actorOf(ContainerPool.props(factory, 1, 1, feed.ref))
+    pool ! runMessage
+    containers(0).expectMsg(runMessage)
+    containers(0).send(pool, RescheduleJob) // emulate container failure ...
+    containers(0).send(pool, runMessage) // ... causing job to be rescheduled
+    feed.expectNoMsg(100.millis)
+    containers(1).expectMsg(runMessage) // job resent to new actor
+  }
+
   /*
    * CONTAINER PREWARMING
    */
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala 
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index ac8da0f..b278871 100644
--- 
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ 
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -220,7 +220,7 @@ class ContainerProxyTests
 
     // Another pause causes the container to be removed
     timeout(machine)
-    expectMsg(ContainerRemoved)
+    expectMsg(RescheduleJob)
     expectMsg(Transition(machine, Paused, Removing))
 
     awaitAssert {
@@ -528,7 +528,7 @@ class ContainerProxyTests
     val runMessage = Run(action, message)
     machine ! runMessage
     expectMsg(Transition(machine, Paused, Running))
-    expectMsg(ContainerRemoved) // The message is sent as soon as the 
container decides to destroy itself
+    expectMsg(RescheduleJob)
     expectMsg(Transition(machine, Running, Removing))
     expectMsg(runMessage)
 
@@ -649,7 +649,7 @@ class ContainerProxyTests
     machine ! Run(action, message)
 
     // State-machine shuts down nonetheless.
-    expectMsg(ContainerRemoved)
+    expectMsg(RescheduleJob)
     expectMsg(Transition(machine, Paused, Removing))
 
     // Pool gets the message again.

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].

Reply via email to