diff --git a/common/scala/src/main/scala/whisk/common/Scheduler.scala 
b/common/scala/src/main/scala/whisk/common/Scheduler.scala
index 28d150260e..bf73ad7d66 100644
--- a/common/scala/src/main/scala/whisk/common/Scheduler.scala
+++ b/common/scala/src/main/scala/whisk/common/Scheduler.scala
@@ -22,11 +22,7 @@ import scala.concurrent.duration._
 import scala.util.Failure
 import scala.util.Success
 import scala.util.Try
-
-import akka.actor.Actor
-import akka.actor.ActorSystem
-import akka.actor.Cancellable
-import akka.actor.Props
+import akka.actor.{Actor, ActorRef, ActorSystem, Cancellable, Props}
 
 /**
  * Scheduler utility functions to execute tasks in a repetitive way with 
controllable behavior
@@ -99,7 +95,7 @@ object Scheduler {
                          name: String = "Scheduler")(f: () => 
Future[Any])(implicit system: ActorSystem,
                                                                            
logging: Logging,
                                                                            
transid: TransactionId =
-                                                                             
TransactionId.unknown) = {
+                                                                             
TransactionId.unknown): ActorRef = {
     require(interval > Duration.Zero)
     system.actorOf(Props(new Worker(initialDelay, interval, false, name, f)))
   }
diff --git 
a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala 
b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
index 14af69e4a9..753c19a36d 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
@@ -60,6 +60,7 @@ object MessageFeed {
   protected[connector] case object Idle extends FeedState
   protected[connector] case object FillingPipeline extends FeedState
   protected[connector] case object DrainingPipeline extends FeedState
+  protected[connector] case object GracefulShutdownDrain extends FeedState
 
   protected sealed trait FeedData
   private case object NoData extends FeedData
@@ -70,6 +71,10 @@ object MessageFeed {
   /** Steady state message, indicates capacity in downstream process to 
receive more messages. */
   object Processed
 
+  object GracefulShutdown
+
+  object Busy
+
   /** Indicates the fill operation has completed. */
   private case class FillCompleted(messages: Seq[(String, Int, Long, 
Array[Byte])])
 }
@@ -117,6 +122,7 @@ class MessageFeed(description: String,
   // Best practice dictates a mutable variable pointing at an immutable 
collection for this reason
   private var outstandingMessages = immutable.Queue.empty[(String, Int, Long, 
Array[Byte])]
   private var handlerCapacity = maximumHandlerCapacity
+  private var fillingQueue = false
 
   private implicit val tid = TransactionId.dispatcher
 
@@ -126,6 +132,7 @@ class MessageFeed(description: String,
 
   when(Idle) {
     case Event(Ready, _) =>
+      fillingQueue = true
       fillPipeline()
       goto(FillingPipeline)
 
@@ -143,14 +150,19 @@ class MessageFeed(description: String,
     case Event(FillCompleted(messages), _) =>
       outstandingMessages = outstandingMessages ++ messages
       sendOutstandingMessages()
+      fillingQueue = false
 
       if (shouldFillQueue()) {
+        fillingQueue = true
         fillPipeline()
         stay
       } else {
         goto(DrainingPipeline)
       }
 
+    case Event(GracefulShutdown, _) =>
+      goto(GracefulShutdownDrain)
+
     case _ => stay
   }
 
@@ -158,11 +170,33 @@ class MessageFeed(description: String,
     case Event(Processed, _) =>
       updateHandlerCapacity()
       sendOutstandingMessages()
+
       if (shouldFillQueue()) {
+        fillingQueue = true
         fillPipeline()
         goto(FillingPipeline)
       } else stay
 
+    case Event(GracefulShutdown, _) =>
+      goto(GracefulShutdownDrain)
+
+    case _ => stay
+  }
+
+  when(GracefulShutdownDrain) {
+    case Event(Processed, _) =>
+      updateHandlerCapacity()
+      sendOutstandingMessages()
+      stay
+
+    case Event(FillCompleted(messages), _) =>
+      outstandingMessages = outstandingMessages ++ messages
+      sendOutstandingMessages()
+      stay
+
+    case Event(Busy, _) =>
+      stay() replying (handlerCapacity != maximumHandlerCapacity && 
outstandingMessages.nonEmpty && fillingQueue)
+
     case _ => stay
   }
 
@@ -207,7 +241,7 @@ class MessageFeed(description: String,
     val occupancy = outstandingMessages.size
     if (occupancy > 0 && handlerCapacity > 0) {
       // Easiest way with an immutable queue to cleanly dequeue
-      // Head is the first elemeent of the queue, desugared w/ an assignment 
pattern
+      // Head is the first element of the queue, desugared w/ an assignment 
pattern
       // Tail is everything but the first element, thus mutating the 
collection variable
       val (topic, partition, offset, bytes) = outstandingMessages.head
       outstandingMessages = outstandingMessages.tail
diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala 
b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
index d26ebdc2f9..70a3b98855 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
@@ -139,7 +139,7 @@ class ContainerPool(childFactory: ActorRefFactory => 
ActorRef,
           val retryLogDeadline = if (isErrorLogged) {
             logging.error(
               this,
-              s"Rescheduling Run message, too many message in the pool, 
freePoolSize: ${freePool.size}, " +
+              s"Rescheduling Run message, too many messages in the pool, 
freePoolSize: ${freePool.size}, " +
                 s"busyPoolSize: ${busyPool.size}, maxActiveContainers 
${poolConfig.maxActiveContainers}, " +
                 s"userNamespace: ${r.msg.user.namespace}, action: 
${r.action}")(r.msg.transid)
             Some(logMessageInterval.fromNow)
@@ -178,6 +178,9 @@ class ContainerPool(childFactory: ActorRefFactory => 
ActorRef,
     case RescheduleJob =>
       freePool = freePool - sender()
       busyPool = busyPool - sender()
+
+    case Busy =>
+      sender ! (busyPool.nonEmpty)
   }
 
   /** Creates a new container and updates state accordingly. */
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala 
b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 4a69d3cf99..32cdc4a06e 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -20,22 +20,18 @@ package whisk.core.invoker
 import scala.concurrent.Await
 import scala.concurrent.duration._
 import scala.concurrent.Future
-import scala.util.Failure
 import scala.util.Try
 import kamon.Kamon
 import org.apache.curator.retry.RetryUntilElapsed
 import org.apache.curator.framework.CuratorFrameworkFactory
 import org.apache.curator.framework.recipes.shared.SharedCount
 import akka.Done
-import akka.actor.ActorSystem
-import akka.actor.CoordinatedShutdown
+import akka.actor.{ActorSystem, CoordinatedShutdown}
 import akka.stream.ActorMaterializer
 import whisk.common.AkkaLogging
-import whisk.common.Scheduler
 import whisk.core.WhiskConfig
 import whisk.core.WhiskConfig._
 import whisk.core.connector.MessagingProvider
-import whisk.core.connector.PingMessage
 import whisk.core.entity._
 import whisk.core.entity.ExecManifest
 import whisk.core.entity.InstanceId
@@ -178,15 +174,10 @@ object Invoker {
       case e: Exception => abort(s"Failed to initialize reactive invoker: 
${e.getMessage}")
     }
 
-    Scheduler.scheduleWaitAtMost(1.seconds)(() => {
-      producer.send("health", PingMessage(invokerInstance)).andThen {
-        case Failure(t) => logger.error(this, s"failed to ping the controller: 
$t")
-      }
-    })
-
     val port = config.servicePort.toInt
     BasicHttpService.startHttpService(new BasicRasService {}.route, port)(
       actorSystem,
       ActorMaterializer.create(actorSystem))
+
   }
 }
diff --git 
a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala 
b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 20cbbd46cd..5b0335817f 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -20,12 +20,16 @@ package whisk.core.invoker
 import java.nio.charset.StandardCharsets
 import java.time.Instant
 
-import akka.actor.{ActorRefFactory, ActorSystem, Props}
+import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, Props}
 import akka.event.Logging.InfoLevel
 import akka.stream.ActorMaterializer
+import akka.pattern.{after, ask, gracefulStop}
+import akka.util.Timeout
+
 import org.apache.kafka.common.errors.RecordTooLargeException
 import pureconfig._
 import spray.json._
+
 import whisk.common._
 import whisk.core.{ConfigKeys, WhiskConfig}
 import whisk.core.connector._
@@ -36,10 +40,13 @@ import whisk.core.entity._
 import whisk.http.Messages
 import whisk.spi.SpiLoader
 
-import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.concurrent.duration._
 import scala.util.{Failure, Success}
+import scala.language.postfixOps
+
 import DefaultJsonProtocol._
+import sun.misc.{Signal, SignalHandler}
 
 class InvokerReactive(
   config: WhiskConfig,
@@ -76,7 +83,6 @@ class InvokerReactive(
           "--ulimit" -> Set("nofile=1024:1024"),
           "--pids-limit" -> Set("1024")) ++ logsProvider.containerParameters)
   containerFactory.init()
-  sys.addShutdownHook(containerFactory.cleanup())
 
   /** Initialize needed databases */
   private val entityStore = WhiskEntityStore.datastore()
@@ -284,4 +290,84 @@ class InvokerReactive(
       })
   }
 
+  val healthScheduler = Scheduler.scheduleWaitAtMost(1.seconds)(() => {
+    producer.send("health", PingMessage(instance)).andThen {
+      case Failure(t) => logging.error(this, s"failed to ping the 
controller(s): $t")
+    }
+  })
+
+  /** Polls the pool's status and returns a future which completes once the 
pool is idle. */
+  def waitForContainerPoolIdle(pool: ActorRef): Future[Unit] = {
+    implicit val timeout = Timeout(5 seconds)
+    val delay = 1.second
+
+    (pool ? Busy)
+      .mapTo[Boolean]
+      .flatMap {
+        case true =>
+          logging.info(this, "Container pool is not idle.")
+          after(delay, actorSystem.scheduler)(waitForContainerPoolIdle(pool))
+        case false =>
+          Future.successful(())
+      }
+      .recoverWith { case _ => after(delay, 
actorSystem.scheduler)(waitForContainerPoolIdle(pool)) }
+  }
+
+  /** Polls the feed's status and returns a future which completes once the 
feed is idle. */
+  def waitForActivationFeedIdle(feed: ActorRef): Future[Unit] = {
+    implicit val timeout = Timeout(5 seconds)
+    val delay = 1.second
+
+    activationFeed ! MessageFeed.GracefulShutdown
+    (feed ? MessageFeed.Busy)
+      .mapTo[Boolean]
+      .flatMap {
+        case true =>
+          logging.info(this, "Activation feed is not idle.")
+          after(delay, actorSystem.scheduler)(waitForActivationFeedIdle(feed))
+        case false =>
+          Future.successful(())
+      }
+      .recoverWith { case _ => after(delay, 
actorSystem.scheduler)(waitForActivationFeedIdle(feed)) }
+  }
+
+  // Capture SIGTERM signals to gracefully shutdown the invoker. When 
gracefully shutting down, the health scheduler is
+  // shutdown preventing additional actions from being scheduler to the 
invoker, then the invoker processes its buffered
+  // messages from the activation feed, and waits for its user containers to 
finish running before the process exits.
+  Signal.handle(new Signal("TERM"), new SignalHandler() {
+    override def handle(signal: Signal) = {
+      logging.info(this, s"Starting graceful shutdown")
+
+      // Order is important here so futures are ran sequentially
+      val shutdowns = for {
+        _ <- gracefulStop(healthScheduler, 5.seconds)
+        _ <- waitForActivationFeedIdle(activationFeed)
+        _ <- waitForContainerPoolIdle(pool)
+      } yield {
+        logging.info(this, "Successfully shutdown health scheduler, activation 
feed, and container pool")
+      }
+
+      // Allow the shutdown to take a maximum of 3 times the maximum action 
runtime since the feed can be
+      // buffered and we want to allow for some grace period. If a graceful 
shutdown is not successful, the
+      // the invoker will continue running and a graceful shutdown can be 
attempted again.
+      Await.result(shutdowns, TimeLimit.MAX_DURATION * 3)
+      containerFactory.cleanup()
+      logging.info(this, "Shutting down invoker")
+      System.exit(0)
+    }
+  })
+
+  // Capture SIGUSR2 signals to put the invoker into drain mode. When 
draining, the health scheduler is shutdown
+  // preventing additional actions from being scheduled to the invoker 
allowing the invoker to process its current
+  // queue.
+  Signal.handle(new Signal("USR2"), new SignalHandler() {
+    override def handle(signal: Signal) = {
+      logging.info(this, "Draining invoker")
+
+      gracefulStop(healthScheduler, 5.seconds).recover {
+        case _ => logging.info(this, "Health communication failed to shutdown 
gracefully")
+      }
+    }
+  })
+
 }


With regards,
Apache Git Services

Reply via email to