This is an automated email from the ASF dual-hosted git repository.

bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 07c920249 optimize scheduling decision when there are stale 
activations (#5344)
07c920249 is described below

commit 07c920249d0a0db5fe3bc603add73e410f40dddd
Author: Brendan Doyle <bdoyle0...@gmail.com>
AuthorDate: Mon Oct 31 22:21:27 2022 -0700

    optimize scheduling decision when there are stale activations (#5344)
    
    * optimize scheduling decision when there are stale activations
    
    * further optimization
    
    * scalafmt
    
    * add new test cases
    
    Co-authored-by: Brendan Doyle <brend...@qualtrics.com>
---
 .../scheduler/queue/SchedulingDecisionMaker.scala  | 51 ++++++++++++----------
 .../queue/test/SchedulingDecisionMakerTests.scala  | 48 ++++++++++++++++++++
 2 files changed, 75 insertions(+), 24 deletions(-)

diff --git 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
index 0e1f4ffa8..d6ae8f63e 100644
--- 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
+++ 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
@@ -135,41 +135,44 @@ class SchedulingDecisionMaker(
               staleActivationNum,
               0.0,
               Running)
-
-          case (Running, Some(duration)) if staleActivationNum > 0 =>
-            // we can safely get the value as we already checked the existence
-            val containerThroughput = staleThreshold / duration
-            val num = ceiling(availableMsg.toDouble / containerThroughput)
-            // if it tries to create more containers than existing messages, 
we just create shortage
-            val actualNum = (if (num > availableMsg) availableMsg else num) - 
inProgress
-            addServersIfPossible(
-              existing,
-              inProgress,
-              containerThroughput,
-              availableMsg,
-              capacity,
-              actualNum,
-              staleActivationNum,
-              duration,
-              Running)
-
           // need more containers and a message is already processed
           case (Running, Some(duration)) =>
             // we can safely get the value as we already checked the existence
             val containerThroughput = staleThreshold / duration
             val expectedTps = containerThroughput * (existing + inProgress)
+            val availableNonStaleActivations = availableMsg - 
staleActivationNum
+
+            var staleContainerProvision = 0
+            if (staleActivationNum > 0) {
+              val num = ceiling(staleActivationNum.toDouble / 
containerThroughput)
+              // if it tries to create more containers than existing messages, 
we just create shortage
+              staleContainerProvision = (if (num > staleActivationNum) 
staleActivationNum else num) - inProgress
+            }
 
-            if (availableMsg >= expectedTps && existing + inProgress < 
availableMsg) {
-              val num = ceiling((availableMsg / containerThroughput) - 
existing - inProgress)
+            if (availableNonStaleActivations >= expectedTps && existing + 
inProgress < availableNonStaleActivations) {
+              val num = ceiling((availableNonStaleActivations / 
containerThroughput) - existing - inProgress)
               // if it tries to create more containers than existing messages, 
we just create shortage
-              val actualNum = if (num + totalContainers > availableMsg) 
availableMsg - totalContainers else num
+              val actualNum =
+                if (num + totalContainers > availableNonStaleActivations) 
availableNonStaleActivations - totalContainers
+                else num
+              addServersIfPossible(
+                existing,
+                inProgress,
+                containerThroughput,
+                availableMsg,
+                capacity,
+                actualNum + staleContainerProvision,
+                staleActivationNum,
+                duration,
+                Running)
+            } else if (staleContainerProvision > 0) {
               addServersIfPossible(
                 existing,
                 inProgress,
                 containerThroughput,
                 availableMsg,
                 capacity,
-                actualNum,
+                staleContainerProvision,
                 staleActivationNum,
                 duration,
                 Running)
@@ -184,9 +187,9 @@ class SchedulingDecisionMaker(
           case (Removing, Some(duration)) if staleActivationNum > 0 =>
             // we can safely get the value as we already checked the existence
             val containerThroughput = staleThreshold / duration
-            val num = ceiling(availableMsg.toDouble / containerThroughput)
+            val num = ceiling(staleActivationNum.toDouble / 
containerThroughput)
             // if it tries to create more containers than existing messages, 
we just create shortage
-            val actualNum = (if (num > availableMsg) availableMsg else num) - 
inProgress
+            val actualNum = (if (num > staleActivationNum) staleActivationNum 
else num) - inProgress
             addServersIfPossible(
               existing,
               inProgress,
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
index 72adc8be2..ce1c86682 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
@@ -683,6 +683,54 @@ class SchedulingDecisionMakerTests
     testProbe.expectMsg(DecisionResults(AddContainer, 2))
   }
 
+  it should "add more containers when there are stale messages and non-stale 
messages and both message classes need more containers" in {
+    val decisionMaker = 
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, 
schedulingConfig))
+    val testProbe = TestProbe()
+
+    val msg = QueueSnapshot(
+      initialized = true,
+      incomingMsgCount = new AtomicInteger(0),
+      currentMsgCount = 5,
+      existingContainerCount = 2,
+      inProgressContainerCount = 0,
+      staleActivationNum = 2,
+      existingContainerCountInNamespace = 2,
+      inProgressContainerCountInNamespace = 0,
+      averageDuration = Some(1000), // the average duration exists
+      limit = 10,
+      stateName = Running,
+      recipient = testProbe.ref)
+
+    decisionMaker ! msg
+
+    //should add two for the stale messages and one to increase tps of 
non-stale available messages
+    testProbe.expectMsg(DecisionResults(AddContainer, 3))
+  }
+
+  it should "add more containers when there are stale messages and non-stale 
messages have needed tps" in {
+    val decisionMaker = 
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, 
schedulingConfig))
+    val testProbe = TestProbe()
+
+    val msg = QueueSnapshot(
+      initialized = true,
+      incomingMsgCount = new AtomicInteger(0),
+      currentMsgCount = 5,
+      existingContainerCount = 2,
+      inProgressContainerCount = 0,
+      staleActivationNum = 2,
+      existingContainerCountInNamespace = 2,
+      inProgressContainerCountInNamespace = 0,
+      averageDuration = Some(50), // the average duration gives container 
throughput of 2
+      limit = 10,
+      stateName = Running,
+      recipient = testProbe.ref)
+
+    decisionMaker ! msg
+
+    //should add one additional container for stale messages and non-stale 
messages still meet tps
+    testProbe.expectMsg(DecisionResults(AddContainer, 1))
+  }
+
   it should "enable namespace throttling while adding more container when 
there are stale messages even in the GracefulShuttingDown" in {
     val decisionMaker = 
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, 
schedulingConfig))
     val testProbe = TestProbe()

Reply via email to