style95 commented on code in PR #5313:
URL: https://github.com/apache/openwhisk/pull/5313#discussion_r949069210


##########
tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala:
##########
@@ -1155,6 +1171,285 @@ class ContainerManagerTests
       case _                                                                  
=> false
     }
   }
+
+  it should "choose an invoker from candidates" in {
+    val candidates = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = 128 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(1, userMemory = 128 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = 256 MB), Healthy),
+    )
+    val msg = ContainerCreationMessage(
+      TransactionId.testing,
+      testInvocationNamespace,
+      FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction)),
+      testRevision,
+      actionMetadata,
+      testsid,
+      schedulerHost,
+      rpcPort)
+
+    // no matter how many time we schedule the msg, it should always choose 
invoker2.
+    (1 to 10).foreach { _ =>
+      val newPairs = ContainerManager.chooseInvokerFromCandidates(candidates, 
msg)

Review Comment:
   We can add unit tests for `chooseInvokerFromCandidates` now.



##########
tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala:
##########
@@ -1155,6 +1171,285 @@ class ContainerManagerTests
       case _                                                                  
=> false
     }
   }
+
+  it should "choose an invoker from candidates" in {
+    val candidates = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = 128 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(1, userMemory = 128 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = 256 MB), Healthy),
+    )
+    val msg = ContainerCreationMessage(
+      TransactionId.testing,
+      testInvocationNamespace,
+      FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction)),
+      testRevision,
+      actionMetadata,
+      testsid,
+      schedulerHost,
+      rpcPort)
+
+    // no matter how many time we schedule the msg, it should always choose 
invoker2.
+    (1 to 10).foreach { _ =>
+      val newPairs = ContainerManager.chooseInvokerFromCandidates(candidates, 
msg)
+      newPairs.invokerId shouldBe Some(InvokerInstanceId(2, userMemory = 256 
MB))
+    }
+  }
+
+  it should "not choose an invoker when there is no candidate with enough 
memory" in {
+    val candidates = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = 128 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(1, userMemory = 128 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = 128 MB), Healthy),
+    )
+    val msg = ContainerCreationMessage(
+      TransactionId.testing,
+      testInvocationNamespace,
+      FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction)),
+      testRevision,
+      actionMetadata,
+      testsid,
+      schedulerHost,
+      rpcPort)
+
+    // no matter how many time we schedule the msg, no invoker should be 
assigned.
+    (1 to 10).foreach { _ =>
+      val newPairs = ContainerManager.chooseInvokerFromCandidates(candidates, 
msg)
+      newPairs.invokerId shouldBe None
+    }
+  }
+
+  it should "not choose an invoker when there is no candidate" in {
+    val candidates = List()
+    val msg = ContainerCreationMessage(
+      TransactionId.testing,
+      testInvocationNamespace,
+      FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction)),
+      testRevision,
+      actionMetadata,
+      testsid,
+      schedulerHost,
+      rpcPort)
+
+    val newPairs = ContainerManager.chooseInvokerFromCandidates(candidates, 
msg)
+    newPairs.invokerId shouldBe None
+  }
+
+  it should "update invoker memory" in {
+    val invokers = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = 1024 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(1, userMemory = 1024 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = 1024 MB), Healthy),
+    )
+    val expected = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = 1024 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(1, userMemory = 768 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = 1024 MB), Healthy),
+    )
+    val requiredMemory = 256.MB.toMB
+    val invokerId = Some(InvokerInstanceId(1, userMemory = 1024 MB))
+
+    val updatedInvokers = ContainerManager.updateInvokerMemory(invokerId, 
requiredMemory, invokers)
+
+    updatedInvokers shouldBe expected
+  }
+
+  it should "not update invoker memory when no invoker is assigned" in {
+    val invokers = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = 1024 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(1, userMemory = 1024 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = 1024 MB), Healthy),
+    )
+    val requiredMemory = 256.MB.toMB
+
+    val updatedInvokers = ContainerManager.updateInvokerMemory(None, 
requiredMemory, invokers)
+
+    updatedInvokers shouldBe invokers
+  }
+
+  it should "drop an invoker with less memory than MIN_MEMORY" in {
+    val invokers = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = 1024 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(1, userMemory = 320 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = 1024 MB), Healthy),
+    )
+    val expected = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = 1024 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = 1024 MB), Healthy),
+    )
+    val requiredMemory = 256.MB.toMB
+    val invokerId = Some(InvokerInstanceId(1, userMemory = 320 MB))
+
+    val updatedInvokers = ContainerManager.updateInvokerMemory(invokerId, 
requiredMemory, invokers)
+
+    updatedInvokers shouldBe expected
+  }
+
+  it should "filter warmed creations when there is no warmed container" in {
+
+    val warmedContainers = Set.empty[String]
+    val inProgressWarmedContainers = TrieMap.empty[String, String]
+
+    val msg1 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction)),
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg2 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        FullyQualifiedEntityName(EntityPath("ns3"), EntityName(testAction)),
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg3 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        FullyQualifiedEntityName(EntityPath("ns3"), EntityName(testAction)),
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+
+    val msgs = List(msg1, msg2, msg3)
+
+    val (coldCreations, warmedCreations) =
+      ContainerManager.filterWarmedCreations(warmedContainers, 
inProgressWarmedContainers, invokers, msgs)
+
+    warmedCreations.isEmpty shouldBe true
+    coldCreations.size shouldBe 3
+  }
+
+  it should "filter warmed creations when there are warmed containers" in {
+    val warmedContainers = Set(
+      ContainerKeys.warmedContainers(
+        testInvocationNamespace,
+        testfqn,
+        testRevision,
+        InvokerInstanceId(0, userMemory = 0.bytes),
+        ContainerId("fake")),
+      ContainerKeys.warmedContainers(
+        testInvocationNamespace,
+        testfqn.copy(name = EntityName("test-action-2")),
+        testRevision,
+        InvokerInstanceId(1, userMemory = 0.bytes),
+        ContainerId("fake")))
+    val inProgressWarmedContainers = TrieMap.empty[String, String]
+
+    val msg1 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn,
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg2 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn.copy(name = EntityName("test-action-2")),
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg3 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn,
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+
+    val msgs = List(msg1, msg2, msg3)
+
+    val (coldCreations, warmedCreations) =
+      ContainerManager.filterWarmedCreations(warmedContainers, 
inProgressWarmedContainers, invokers, msgs)
+
+    warmedCreations.size shouldBe 2
+    coldCreations.size shouldBe 1
+
+    warmedCreations.map(_._1).contains(msg1) shouldBe true
+    warmedCreations.map(_._1).contains(msg2) shouldBe true
+    coldCreations.map(_._1).contains(msg3) shouldBe true
+  }
+
+  it should "choose cold creation when warmed containers are in disabled 
invokers" in {

Review Comment:
   This test will guarantee no disabled invoker is used because of warmed 
containers.
   



##########
core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala:
##########
@@ -462,32 +549,29 @@ object ContainerManager {
     list
   }
 
-  private def chooseInvokerFromCandidates(
-    candidates: List[InvokerHealth],
-    wholeInvokers: List[InvokerHealth],
-    pairs: List[ScheduledPair],
-    msg: ContainerCreationMessage)(implicit logging: Logging): 
(List[ScheduledPair], List[InvokerHealth]) = {
-    val idx = rng(mod = candidates.size)
-    val instance = candidates(idx)
-    // it must be compared to the instance unique id
-    val idxInWhole = wholeInvokers.indexOf(wholeInvokers.filter(p => 
p.id.instance == instance.id.instance).head)
-    val requiredMemory = msg.whiskActionMetaData.limits.memory.megabytes
-    val updated =
-      if (instance.id.userMemory.toMB - requiredMemory >= requiredMemory) { // 
Since ByteSize is negative, it converts to long type and compares.
-        wholeInvokers.updated(
-          idxInWhole,
-          instance.copy(id = instance.id.copy(userMemory = 
instance.id.userMemory - requiredMemory.MB)))
+  protected[container] def chooseInvokerFromCandidates(candidates: 
List[InvokerHealth], msg: ContainerCreationMessage)(
+    implicit logging: Logging): ScheduledPair = {
+    val requiredMemory = msg.whiskActionMetaData.limits.memory
+    if (candidates.isEmpty) {
+      ScheduledPair(msg, invokerId = None, Some(NoAvailableInvokersError), 
Some(s"No available invokers."))
+    } else if (candidates.forall(p => p.id.userMemory.toMB < 
requiredMemory.megabytes)) {
+      ScheduledPair(msg, invokerId = None, Some(NoAvailableInvokersError), 
Some(s"No available invokers."))
+    } else {
+      val idx = rng(mod = candidates.size)
+      val instance = candidates(idx)
+      if (instance.id.userMemory.toMB < requiredMemory.megabytes) {
+        val split = candidates.splitAt(idx)
+        val _ :: t1 = split._2
+        chooseInvokerFromCandidates(split._1 ::: t1, msg)
       } else {
-        // drop the nth element
-        val split = wholeInvokers.splitAt(idxInWhole)

Review Comment:
   It was also a bug.
   When the chosen invoker does not have enough memory, it just removed the 
invoker from the list but still, the message is scheduled to the invoker 
without enough resources.
   



##########
core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala:
##########
@@ -360,6 +337,100 @@ object ContainerManager {
    */
   def rng(mod: Int): Int = ThreadLocalRandom.current().nextInt(mod)
 
+  // Partition messages that can use warmed containers.
+  // return: (list of messages that cannot use warmed containers, list of 
messages that can take advantage of warmed containers)
+  protected[container] def filterWarmedCreations(warmedContainers: Set[String],
+                                                 inProgressWarmedContainers: 
TrieMap[String, String],
+                                                 invokers: List[InvokerHealth],
+                                                 msgs: 
List[ContainerCreationMessage])(
+    implicit logging: Logging): (List[(ContainerCreationMessage, Option[Int], 
Option[String])],
+                                 List[(ContainerCreationMessage, Option[Int], 
Option[String])]) = {
+    val warmedApplied = msgs.map { msg =>
+      val warmedPrefix =
+        containerPrefix(ContainerKeys.warmedPrefix, msg.invocationNamespace, 
msg.action, Some(msg.revision))
+      val container = warmedContainers
+        .filter(!inProgressWarmedContainers.values.toSeq.contains(_))
+        .flatMap { container =>
+          if (container.startsWith(warmedPrefix)) {
+            logging.info(this, s"Choose a warmed container $container")
+
+            // this is required to exclude already chosen invokers
+            inProgressWarmedContainers.update(msg.creationId.asString, 
container)
+            Some(container)
+          } else
+            None
+        }
+
+      // chosenInvoker is supposed to have only one item
+      val chosenInvoker = container
+        .map(_.split("/").takeRight(3).apply(0))
+        // filter warmed containers in disabled invokers
+        .filter(
+          invoker =>
+            invokers
+            // filterWarmedCreations method is supposed to receive healthy 
invokers only but this will make sure again only healthy invokers are used.
+              .filter(invoker => invoker.status.isUsable)
+              .map(_.id.instance)
+              .contains(invoker.toInt))
+
+      if (chosenInvoker.nonEmpty) {
+        (msg, Some(chosenInvoker.head.toInt), Some(container.head))
+      } else
+        (msg, None, None)
+    }
+
+    warmedApplied.partition { item =>
+      if (item._2.nonEmpty) false

Review Comment:
   If a tuple `_2.` is not empty which means there "is" a chosen invoker, this 
would be a warmed creation.
   



##########
core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala:
##########
@@ -395,64 +466,80 @@ object ContainerManager {
         val resourcesStrictPolicy = msg.whiskActionMetaData.annotations
           
.getAs[Boolean](Annotations.InvokerResourcesStrictPolicyAnnotationName)
           .getOrElse(true)
-        val isBlackboxInvocation = 
msg.whiskActionMetaData.toExecutableWhiskAction.map(_.exec.pull).getOrElse(false)
+        val isBlackboxInvocation = 
msg.whiskActionMetaData.toExecutableWhiskAction.exists(_.exec.pull)
         if (requiredResources.isEmpty) {
           // only choose managed invokers or blackbox invokers
           val wantedInvokers = if (isBlackboxInvocation) {
-            candidates.filter(c => blackboxInvokers.map(b => 
b.id.instance).contains(c.id.instance)).toSet
+            candidates
+              .filter(
+                c =>
+                  blackboxInvokers
+                    .map(b => b.id.instance)
+                    .contains(c.id.instance) && c.id.userMemory.toMB >= 
msg.whiskActionMetaData.limits.memory.megabytes)
+              .toSet
           } else {
-            candidates.filter(c => managedInvokers.map(m => 
m.id.instance).contains(c.id.instance)).toSet
+            candidates
+              .filter(
+                c =>
+                  managedInvokers
+                    .map(m => m.id.instance)
+                    .contains(c.id.instance) && c.id.userMemory.toMB >= 
msg.whiskActionMetaData.limits.memory.megabytes)
+              .toSet
           }
           val taggedInvokers = candidates.filter(_.id.tags.nonEmpty)
 
           if (wantedInvokers.nonEmpty) {
-            chooseInvokerFromCandidates(wantedInvokers.toList, invokers, 
pairs, msg)
+            val scheduledPair = 
chooseInvokerFromCandidates(wantedInvokers.toList, msg)
+            val updatedInvokers =
+              updateInvokerMemory(scheduledPair.invokerId, 
msg.whiskActionMetaData.limits.memory.megabytes, invokers)
+            (scheduledPair :: pairs, updatedInvokers)
           } else if (taggedInvokers.nonEmpty) { // if not found from the 
wanted invokers, choose tagged invokers then
-            chooseInvokerFromCandidates(taggedInvokers, invokers, pairs, msg)
+            val scheduledPair = chooseInvokerFromCandidates(taggedInvokers, 
msg)
+            val updatedInvokers =
+              updateInvokerMemory(scheduledPair.invokerId, 
msg.whiskActionMetaData.limits.memory.megabytes, invokers)
+            (scheduledPair :: pairs, updatedInvokers)
           } else {
-            sendState(
-              FailedCreationJob(
-                msg.creationId,
-                msg.invocationNamespace,
-                msg.action,
-                msg.revision,
-                NoAvailableInvokersError,
-                s"No available invokers."))
-            (pairs, candidates)
+            val scheduledPair =
+              ScheduledPair(msg, invokerId = None, 
Some(NoAvailableInvokersError), Some(s"No available invokers."))
+            (scheduledPair :: pairs, invokers)
           }
         } else {
           val wantedInvokers = candidates.filter(health => 
requiredResources.toSet.subsetOf(health.id.tags.toSet))
           if (wantedInvokers.nonEmpty) {
-            chooseInvokerFromCandidates(wantedInvokers, invokers, pairs, msg)
+            val scheduledPair = chooseInvokerFromCandidates(wantedInvokers, 
msg)
+            val updatedInvokers =
+              updateInvokerMemory(scheduledPair.invokerId, 
msg.whiskActionMetaData.limits.memory.megabytes, invokers)
+            (scheduledPair :: pairs, updatedInvokers)
           } else if (resourcesStrictPolicy) {
-            sendState(
-              FailedCreationJob(
-                msg.creationId,
-                msg.invocationNamespace,
-                msg.action,
-                msg.revision,
-                NoAvailableResourceInvokersError,
-                s"No available invokers with resources $requiredResources."))
-            (pairs, candidates)
+            val scheduledPair =
+              ScheduledPair(
+                msg,
+                invokerId = None,
+                Some(NoAvailableResourceInvokersError),
+                Some(s"No available invokers with resources 
$requiredResources."))
+            (scheduledPair :: pairs, invokers)
           } else {
             val (noTaggedInvokers, taggedInvokers) = 
candidates.partition(_.id.tags.isEmpty)
             if (noTaggedInvokers.nonEmpty) { // choose no tagged invokers first
-              chooseInvokerFromCandidates(noTaggedInvokers, invokers, pairs, 
msg)
+              val scheduledPair = 
chooseInvokerFromCandidates(noTaggedInvokers, msg)
+              val updatedInvokers =
+                updateInvokerMemory(scheduledPair.invokerId, 
msg.whiskActionMetaData.limits.memory.megabytes, invokers)
+              (scheduledPair :: pairs, updatedInvokers)
             } else {
               val leftInvokers =
                 taggedInvokers.filterNot(health => 
requiredResources.toSet.subsetOf(health.id.tags.toSet))
-              if (leftInvokers.nonEmpty)
-                chooseInvokerFromCandidates(leftInvokers, invokers, pairs, msg)
-              else {
-                sendState(

Review Comment:
   This `schedule` method also had side effects like this one.
   It was sending a state. Since it makes it harder to test this method, I also 
removed this kind of side effect too.
   Now the `schedule` method only schedules invokers for the given cold 
creations.



##########
core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala:
##########
@@ -462,32 +537,30 @@ object ContainerManager {
     list
   }
 
-  private def chooseInvokerFromCandidates(
-    candidates: List[InvokerHealth],
-    wholeInvokers: List[InvokerHealth],
-    pairs: List[ScheduledPair],
-    msg: ContainerCreationMessage)(implicit logging: Logging): 
(List[ScheduledPair], List[InvokerHealth]) = {
-    val idx = rng(mod = candidates.size)
-    val instance = candidates(idx)
-    // it must be compared to the instance unique id
-    val idxInWhole = wholeInvokers.indexOf(wholeInvokers.filter(p => 
p.id.instance == instance.id.instance).head)
-    val requiredMemory = msg.whiskActionMetaData.limits.memory.megabytes
-    val updated =
-      if (instance.id.userMemory.toMB - requiredMemory >= requiredMemory) { // 
Since ByteSize is negative, it converts to long type and compares.
-        wholeInvokers.updated(
-          idxInWhole,
-          instance.copy(id = instance.id.copy(userMemory = 
instance.id.userMemory - requiredMemory.MB)))
+  @tailrec
+  protected[container] def chooseInvokerFromCandidates(candidates: 
List[InvokerHealth], msg: ContainerCreationMessage)(
+    implicit logging: Logging): ScheduledPair = {
+    val requiredMemory = msg.whiskActionMetaData.limits.memory
+    if (candidates.isEmpty) {
+      ScheduledPair(msg, invokerId = None, Some(NoAvailableInvokersError), 
Some(s"No available invokers."))
+    } else if (candidates.forall(p => p.id.userMemory.toMB < 
requiredMemory.megabytes)) {
+      ScheduledPair(msg, invokerId = None, Some(NoAvailableInvokersError), 
Some(s"No available invokers."))
+    } else {
+      val idx = rng(mod = candidates.size)
+      val instance = candidates(idx)
+      if (instance.id.userMemory.toMB < requiredMemory.megabytes) {
+        val split = candidates.splitAt(idx)
+        val _ :: t1 = split._2
+        chooseInvokerFromCandidates(split._1 ::: t1, msg)

Review Comment:
   Now, this method recurses itself without the invoker with not enough 
resources.
   



##########
core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala:
##########
@@ -360,6 +337,100 @@ object ContainerManager {
    */
   def rng(mod: Int): Int = ThreadLocalRandom.current().nextInt(mod)
 
+  // Partition messages that can use warmed containers.
+  // return: (list of messages that cannot use warmed containers, list of 
messages that can take advantage of warmed containers)
+  protected[container] def filterWarmedCreations(warmedContainers: Set[String],
+                                                 inProgressWarmedContainers: 
TrieMap[String, String],
+                                                 invokers: List[InvokerHealth],
+                                                 msgs: 
List[ContainerCreationMessage])(
+    implicit logging: Logging): (List[(ContainerCreationMessage, Option[Int], 
Option[String])],
+                                 List[(ContainerCreationMessage, Option[Int], 
Option[String])]) = {
+    val warmedApplied = msgs.map { msg =>
+      val warmedPrefix =
+        containerPrefix(ContainerKeys.warmedPrefix, msg.invocationNamespace, 
msg.action, Some(msg.revision))
+      val container = warmedContainers
+        .filter(!inProgressWarmedContainers.values.toSeq.contains(_))
+        .flatMap { container =>
+          if (container.startsWith(warmedPrefix)) {
+            logging.info(this, s"Choose a warmed container $container")
+
+            // this is required to exclude already chosen invokers
+            inProgressWarmedContainers.update(msg.creationId.asString, 
container)
+            Some(container)
+          } else
+            None
+        }
+
+      // chosenInvoker is supposed to have only one item
+      val chosenInvoker = container
+        .map(_.split("/").takeRight(3).apply(0))
+        // filter warmed containers in disabled invokers
+        .filter(
+          invoker =>
+            invokers
+            // filterWarmedCreations method is supposed to receive healthy 
invokers only but this will make sure again only healthy invokers are used.
+              .filter(invoker => invoker.status.isUsable)

Review Comment:
   This will make sure we can exclude unhealthy invokers even if they have some 
warmed containers.
   



##########
core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala:
##########
@@ -360,6 +337,100 @@ object ContainerManager {
    */
   def rng(mod: Int): Int = ThreadLocalRandom.current().nextInt(mod)
 
+  // Partition messages that can use warmed containers.
+  // return: (list of messages that cannot use warmed containers, list of 
messages that can take advantage of warmed containers)
+  protected[container] def filterWarmedCreations(warmedContainers: Set[String],

Review Comment:
   Now, this method just partitions the creation messages into two sets, warmed 
creations, and cold creations.
   Since I removed the side effects, this method can be located in the 
companion object now.
   



##########
core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala:
##########
@@ -360,6 +337,100 @@ object ContainerManager {
    */
   def rng(mod: Int): Int = ThreadLocalRandom.current().nextInt(mod)
 
+  // Partition messages that can use warmed containers.
+  // return: (list of messages that cannot use warmed containers, list of 
messages that can take advantage of warmed containers)
+  protected[container] def filterWarmedCreations(warmedContainers: Set[String],
+                                                 inProgressWarmedContainers: 
TrieMap[String, String],
+                                                 invokers: List[InvokerHealth],
+                                                 msgs: 
List[ContainerCreationMessage])(
+    implicit logging: Logging): (List[(ContainerCreationMessage, Option[Int], 
Option[String])],
+                                 List[(ContainerCreationMessage, Option[Int], 
Option[String])]) = {
+    val warmedApplied = msgs.map { msg =>
+      val warmedPrefix =
+        containerPrefix(ContainerKeys.warmedPrefix, msg.invocationNamespace, 
msg.action, Some(msg.revision))
+      val container = warmedContainers
+        .filter(!inProgressWarmedContainers.values.toSeq.contains(_))
+        .flatMap { container =>
+          if (container.startsWith(warmedPrefix)) {
+            logging.info(this, s"Choose a warmed container $container")
+
+            // this is required to exclude already chosen invokers
+            inProgressWarmedContainers.update(msg.creationId.asString, 
container)

Review Comment:
   This looks like a side effect, but `inProgressWarmedContainers` is a local 
variable passed as an argument and only valid in the scope of this method. 



##########
core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala:
##########
@@ -248,31 +250,6 @@ class ContainerManager(jobManagerFactory: ActorRefFactory 
=> ActorRef,
     ContainerKeyMeta(revision, invokerId, containerId)
   }
 
-  // Filter out messages which can use warmed container
-  private def filterWarmedCreations(msgs: List[ContainerCreationMessage]) = {
-    msgs.filter { msg =>
-      val warmedPrefix =
-        containerPrefix(ContainerKeys.warmedPrefix, msg.invocationNamespace, 
msg.action, Some(msg.revision))
-      val chosenInvoker = warmedContainers
-        .filter(!inProgressWarmedContainers.values.toSeq.contains(_))
-        .find { container =>
-          if (container.startsWith(warmedPrefix)) {
-            logging.info(this, s"Choose a warmed container $container")
-            inProgressWarmedContainers.update(msg.creationId.asString, 
container)
-            true
-          } else
-            false
-        }
-        .map(_.split("/").takeRight(3).apply(0))
-      if (chosenInvoker.nonEmpty) {
-        creationJobManager ! RegisterCreationJob(msg)
-        sendCreationContainerToInvoker(messagingProducer, 
chosenInvoker.get.toInt, msg)

Review Comment:
   There were side effects in this method like this one.
   This method is supposed to filter warmed creations but it also sent creation 
messages to invokers.
   This kind of side effect makes it harder to test this method, I removed side 
effects.
   



##########
core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala:
##########
@@ -360,6 +337,100 @@ object ContainerManager {
    */
   def rng(mod: Int): Int = ThreadLocalRandom.current().nextInt(mod)
 
+  // Partition messages that can use warmed containers.
+  // return: (list of messages that cannot use warmed containers, list of 
messages that can take advantage of warmed containers)
+  protected[container] def filterWarmedCreations(warmedContainers: Set[String],
+                                                 inProgressWarmedContainers: 
TrieMap[String, String],
+                                                 invokers: List[InvokerHealth],
+                                                 msgs: 
List[ContainerCreationMessage])(
+    implicit logging: Logging): (List[(ContainerCreationMessage, Option[Int], 
Option[String])],
+                                 List[(ContainerCreationMessage, Option[Int], 
Option[String])]) = {
+    val warmedApplied = msgs.map { msg =>
+      val warmedPrefix =
+        containerPrefix(ContainerKeys.warmedPrefix, msg.invocationNamespace, 
msg.action, Some(msg.revision))
+      val container = warmedContainers
+        .filter(!inProgressWarmedContainers.values.toSeq.contains(_))
+        .flatMap { container =>
+          if (container.startsWith(warmedPrefix)) {
+            logging.info(this, s"Choose a warmed container $container")
+
+            // this is required to exclude already chosen invokers
+            inProgressWarmedContainers.update(msg.creationId.asString, 
container)
+            Some(container)
+          } else
+            None
+        }
+
+      // chosenInvoker is supposed to have only one item
+      val chosenInvoker = container
+        .map(_.split("/").takeRight(3).apply(0))
+        // filter warmed containers in disabled invokers
+        .filter(
+          invoker =>
+            invokers
+            // filterWarmedCreations method is supposed to receive healthy 
invokers only but this will make sure again only healthy invokers are used.
+              .filter(invoker => invoker.status.isUsable)
+              .map(_.id.instance)
+              .contains(invoker.toInt))
+
+      if (chosenInvoker.nonEmpty) {
+        (msg, Some(chosenInvoker.head.toInt), Some(container.head))

Review Comment:
   Now, this is an example of the return value.
   When there are available warmed creations, it will return tuple3 with the 
original message, the chosen invoker, and the chosen warmed container key.
   
   The warmed container key is required to update the member variable, 
`inProgressWarmedContainers` of this class,



##########
core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala:
##########
@@ -395,64 +466,80 @@ object ContainerManager {
         val resourcesStrictPolicy = msg.whiskActionMetaData.annotations
           
.getAs[Boolean](Annotations.InvokerResourcesStrictPolicyAnnotationName)
           .getOrElse(true)
-        val isBlackboxInvocation = 
msg.whiskActionMetaData.toExecutableWhiskAction.map(_.exec.pull).getOrElse(false)
+        val isBlackboxInvocation = 
msg.whiskActionMetaData.toExecutableWhiskAction.exists(_.exec.pull)
         if (requiredResources.isEmpty) {
           // only choose managed invokers or blackbox invokers
           val wantedInvokers = if (isBlackboxInvocation) {
-            candidates.filter(c => blackboxInvokers.map(b => 
b.id.instance).contains(c.id.instance)).toSet
+            candidates
+              .filter(
+                c =>
+                  blackboxInvokers
+                    .map(b => b.id.instance)
+                    .contains(c.id.instance) && c.id.userMemory.toMB >= 
msg.whiskActionMetaData.limits.memory.megabytes)
+              .toSet
           } else {
-            candidates.filter(c => managedInvokers.map(m => 
m.id.instance).contains(c.id.instance)).toSet
+            candidates
+              .filter(
+                c =>
+                  managedInvokers
+                    .map(m => m.id.instance)
+                    .contains(c.id.instance) && c.id.userMemory.toMB >= 
msg.whiskActionMetaData.limits.memory.megabytes)
+              .toSet
           }
           val taggedInvokers = candidates.filter(_.id.tags.nonEmpty)
 
           if (wantedInvokers.nonEmpty) {
-            chooseInvokerFromCandidates(wantedInvokers.toList, invokers, 
pairs, msg)
+            val scheduledPair = 
chooseInvokerFromCandidates(wantedInvokers.toList, msg)
+            val updatedInvokers =
+              updateInvokerMemory(scheduledPair.invokerId, 
msg.whiskActionMetaData.limits.memory.megabytes, invokers)
+            (scheduledPair :: pairs, updatedInvokers)
           } else if (taggedInvokers.nonEmpty) { // if not found from the 
wanted invokers, choose tagged invokers then
-            chooseInvokerFromCandidates(taggedInvokers, invokers, pairs, msg)
+            val scheduledPair = chooseInvokerFromCandidates(taggedInvokers, 
msg)
+            val updatedInvokers =
+              updateInvokerMemory(scheduledPair.invokerId, 
msg.whiskActionMetaData.limits.memory.megabytes, invokers)
+            (scheduledPair :: pairs, updatedInvokers)
           } else {
-            sendState(
-              FailedCreationJob(
-                msg.creationId,
-                msg.invocationNamespace,
-                msg.action,
-                msg.revision,
-                NoAvailableInvokersError,
-                s"No available invokers."))
-            (pairs, candidates)
+            val scheduledPair =
+              ScheduledPair(msg, invokerId = None, 
Some(NoAvailableInvokersError), Some(s"No available invokers."))
+            (scheduledPair :: pairs, invokers)
           }
         } else {
           val wantedInvokers = candidates.filter(health => 
requiredResources.toSet.subsetOf(health.id.tags.toSet))
           if (wantedInvokers.nonEmpty) {
-            chooseInvokerFromCandidates(wantedInvokers, invokers, pairs, msg)
+            val scheduledPair = chooseInvokerFromCandidates(wantedInvokers, 
msg)
+            val updatedInvokers =
+              updateInvokerMemory(scheduledPair.invokerId, 
msg.whiskActionMetaData.limits.memory.megabytes, invokers)
+            (scheduledPair :: pairs, updatedInvokers)
           } else if (resourcesStrictPolicy) {
-            sendState(
-              FailedCreationJob(
-                msg.creationId,
-                msg.invocationNamespace,
-                msg.action,
-                msg.revision,
-                NoAvailableResourceInvokersError,
-                s"No available invokers with resources $requiredResources."))
-            (pairs, candidates)
+            val scheduledPair =
+              ScheduledPair(
+                msg,
+                invokerId = None,
+                Some(NoAvailableResourceInvokersError),
+                Some(s"No available invokers with resources 
$requiredResources."))
+            (scheduledPair :: pairs, invokers)
           } else {
             val (noTaggedInvokers, taggedInvokers) = 
candidates.partition(_.id.tags.isEmpty)
             if (noTaggedInvokers.nonEmpty) { // choose no tagged invokers first
-              chooseInvokerFromCandidates(noTaggedInvokers, invokers, pairs, 
msg)
+              val scheduledPair = 
chooseInvokerFromCandidates(noTaggedInvokers, msg)
+              val updatedInvokers =
+                updateInvokerMemory(scheduledPair.invokerId, 
msg.whiskActionMetaData.limits.memory.megabytes, invokers)
+              (scheduledPair :: pairs, updatedInvokers)
             } else {
               val leftInvokers =
                 taggedInvokers.filterNot(health => 
requiredResources.toSet.subsetOf(health.id.tags.toSet))
-              if (leftInvokers.nonEmpty)
-                chooseInvokerFromCandidates(leftInvokers, invokers, pairs, msg)
-              else {
-                sendState(
-                  FailedCreationJob(
-                    msg.creationId,
-                    msg.invocationNamespace,
-                    msg.action,
-                    msg.revision,
-                    NoAvailableInvokersError,
-                    s"No available invokers."))
-                (pairs, candidates)
+              if (leftInvokers.nonEmpty) {
+                val scheduledPair = chooseInvokerFromCandidates(leftInvokers, 
msg)
+                val updatedInvokers =
+                  updateInvokerMemory(
+                    scheduledPair.invokerId,
+                    msg.whiskActionMetaData.limits.memory.megabytes,
+                    invokers)
+                (scheduledPair :: pairs, updatedInvokers)

Review Comment:
   While iterating all container creation messages with `foldLeft`, we need to 
keep updating the memory of the given invoker set when any message is scheduled.
   So this would be the return value of the foldLeft, it returns the scheduled 
pairs with invokers with updated resources.
   



##########
core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala:
##########
@@ -157,43 +139,64 @@ class ContainerManager(jobManagerFactory: ActorRefFactory 
=> ActorRef,
     case _ =>
   }
 
-  private def createContainer(msgs: List[ContainerCreationMessage],
-                              memory: ByteSize,
-                              invocationNamespace: String): Unit = {
+  private def createContainer(msgs: List[ContainerCreationMessage], memory: 
ByteSize, invocationNamespace: String)(

Review Comment:
   Now, this method only has side effects.
   Since the internal core logic(which is extracted as methods) has no side 
effects, it is testable and side effects are isolated to this method.
   



##########
tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala:
##########
@@ -1155,6 +1171,285 @@ class ContainerManagerTests
       case _                                                                  
=> false
     }
   }
+
+  it should "choose an invoker from candidates" in {
+    val candidates = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = 128 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(1, userMemory = 128 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = 256 MB), Healthy),
+    )
+    val msg = ContainerCreationMessage(
+      TransactionId.testing,
+      testInvocationNamespace,
+      FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction)),
+      testRevision,
+      actionMetadata,
+      testsid,
+      schedulerHost,
+      rpcPort)
+
+    // no matter how many time we schedule the msg, it should always choose 
invoker2.
+    (1 to 10).foreach { _ =>
+      val newPairs = ContainerManager.chooseInvokerFromCandidates(candidates, 
msg)
+      newPairs.invokerId shouldBe Some(InvokerInstanceId(2, userMemory = 256 
MB))
+    }
+  }
+
+  it should "not choose an invoker when there is no candidate with enough 
memory" in {
+    val candidates = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = 128 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(1, userMemory = 128 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = 128 MB), Healthy),
+    )
+    val msg = ContainerCreationMessage(
+      TransactionId.testing,
+      testInvocationNamespace,
+      FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction)),
+      testRevision,
+      actionMetadata,
+      testsid,
+      schedulerHost,
+      rpcPort)
+
+    // no matter how many time we schedule the msg, no invoker should be 
assigned.
+    (1 to 10).foreach { _ =>
+      val newPairs = ContainerManager.chooseInvokerFromCandidates(candidates, 
msg)
+      newPairs.invokerId shouldBe None
+    }
+  }
+
+  it should "not choose an invoker when there is no candidate" in {
+    val candidates = List()
+    val msg = ContainerCreationMessage(
+      TransactionId.testing,
+      testInvocationNamespace,
+      FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction)),
+      testRevision,
+      actionMetadata,
+      testsid,
+      schedulerHost,
+      rpcPort)
+
+    val newPairs = ContainerManager.chooseInvokerFromCandidates(candidates, 
msg)
+    newPairs.invokerId shouldBe None
+  }
+
+  it should "update invoker memory" in {
+    val invokers = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = 1024 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(1, userMemory = 1024 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = 1024 MB), Healthy),
+    )
+    val expected = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = 1024 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(1, userMemory = 768 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = 1024 MB), Healthy),
+    )
+    val requiredMemory = 256.MB.toMB
+    val invokerId = Some(InvokerInstanceId(1, userMemory = 1024 MB))
+
+    val updatedInvokers = ContainerManager.updateInvokerMemory(invokerId, 
requiredMemory, invokers)
+
+    updatedInvokers shouldBe expected
+  }
+
+  it should "not update invoker memory when no invoker is assigned" in {
+    val invokers = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = 1024 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(1, userMemory = 1024 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = 1024 MB), Healthy),
+    )
+    val requiredMemory = 256.MB.toMB
+
+    val updatedInvokers = ContainerManager.updateInvokerMemory(None, 
requiredMemory, invokers)
+
+    updatedInvokers shouldBe invokers
+  }
+
+  it should "drop an invoker with less memory than MIN_MEMORY" in {
+    val invokers = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = 1024 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(1, userMemory = 320 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = 1024 MB), Healthy),
+    )
+    val expected = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = 1024 MB), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = 1024 MB), Healthy),
+    )
+    val requiredMemory = 256.MB.toMB
+    val invokerId = Some(InvokerInstanceId(1, userMemory = 320 MB))
+
+    val updatedInvokers = ContainerManager.updateInvokerMemory(invokerId, 
requiredMemory, invokers)
+
+    updatedInvokers shouldBe expected
+  }
+
+  it should "filter warmed creations when there is no warmed container" in {
+
+    val warmedContainers = Set.empty[String]
+    val inProgressWarmedContainers = TrieMap.empty[String, String]
+
+    val msg1 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction)),
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg2 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        FullyQualifiedEntityName(EntityPath("ns3"), EntityName(testAction)),
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg3 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        FullyQualifiedEntityName(EntityPath("ns3"), EntityName(testAction)),
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+
+    val msgs = List(msg1, msg2, msg3)
+
+    val (coldCreations, warmedCreations) =
+      ContainerManager.filterWarmedCreations(warmedContainers, 
inProgressWarmedContainers, invokers, msgs)
+
+    warmedCreations.isEmpty shouldBe true
+    coldCreations.size shouldBe 3
+  }
+
+  it should "filter warmed creations when there are warmed containers" in {
+    val warmedContainers = Set(
+      ContainerKeys.warmedContainers(
+        testInvocationNamespace,
+        testfqn,
+        testRevision,
+        InvokerInstanceId(0, userMemory = 0.bytes),
+        ContainerId("fake")),
+      ContainerKeys.warmedContainers(
+        testInvocationNamespace,
+        testfqn.copy(name = EntityName("test-action-2")),
+        testRevision,
+        InvokerInstanceId(1, userMemory = 0.bytes),
+        ContainerId("fake")))
+    val inProgressWarmedContainers = TrieMap.empty[String, String]
+
+    val msg1 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn,
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg2 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn.copy(name = EntityName("test-action-2")),
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg3 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn,
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+
+    val msgs = List(msg1, msg2, msg3)
+
+    val (coldCreations, warmedCreations) =
+      ContainerManager.filterWarmedCreations(warmedContainers, 
inProgressWarmedContainers, invokers, msgs)

Review Comment:
   We can add unit tests for `filterWarmedCreations` now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@openwhisk.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to