This is an automated email from the ASF dual-hosted git repository. bdoyle pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 07c920249 optimize scheduling decision when there are stale activations (#5344) 07c920249 is described below commit 07c920249d0a0db5fe3bc603add73e410f40dddd Author: Brendan Doyle <bdoyle0...@gmail.com> AuthorDate: Mon Oct 31 22:21:27 2022 -0700 optimize scheduling decision when there are stale activations (#5344) * optimize scheduling decision when there are stale activations * further optimization * scalafmt * add new test cases Co-authored-by: Brendan Doyle <brend...@qualtrics.com> --- .../scheduler/queue/SchedulingDecisionMaker.scala | 51 ++++++++++++---------- .../queue/test/SchedulingDecisionMakerTests.scala | 48 ++++++++++++++++++++ 2 files changed, 75 insertions(+), 24 deletions(-) diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala index 0e1f4ffa8..d6ae8f63e 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala @@ -135,41 +135,44 @@ class SchedulingDecisionMaker( staleActivationNum, 0.0, Running) - - case (Running, Some(duration)) if staleActivationNum > 0 => - // we can safely get the value as we already checked the existence - val containerThroughput = staleThreshold / duration - val num = ceiling(availableMsg.toDouble / containerThroughput) - // if it tries to create more containers than existing messages, we just create shortage - val actualNum = (if (num > availableMsg) availableMsg else num) - inProgress - addServersIfPossible( - existing, - inProgress, - containerThroughput, - availableMsg, - capacity, - actualNum, - staleActivationNum, - duration, - Running) - // need more containers and a message is already processed case (Running, Some(duration)) => // we can safely get the value as we already checked the existence val containerThroughput = staleThreshold / duration val expectedTps = containerThroughput * (existing + inProgress) + val availableNonStaleActivations = availableMsg - staleActivationNum + + var staleContainerProvision = 0 + if (staleActivationNum > 0) { + val num = ceiling(staleActivationNum.toDouble / containerThroughput) + // if it tries to create more containers than existing messages, we just create shortage + staleContainerProvision = (if (num > staleActivationNum) staleActivationNum else num) - inProgress + } - if (availableMsg >= expectedTps && existing + inProgress < availableMsg) { - val num = ceiling((availableMsg / containerThroughput) - existing - inProgress) + if (availableNonStaleActivations >= expectedTps && existing + inProgress < availableNonStaleActivations) { + val num = ceiling((availableNonStaleActivations / containerThroughput) - existing - inProgress) // if it tries to create more containers than existing messages, we just create shortage - val actualNum = if (num + totalContainers > availableMsg) availableMsg - totalContainers else num + val actualNum = + if (num + totalContainers > availableNonStaleActivations) availableNonStaleActivations - totalContainers + else num + addServersIfPossible( + existing, + inProgress, + containerThroughput, + availableMsg, + capacity, + actualNum + staleContainerProvision, + staleActivationNum, + duration, + Running) + } else if (staleContainerProvision > 0) { addServersIfPossible( existing, inProgress, containerThroughput, availableMsg, capacity, - actualNum, + staleContainerProvision, staleActivationNum, duration, Running) @@ -184,9 +187,9 @@ class SchedulingDecisionMaker( case (Removing, Some(duration)) if staleActivationNum > 0 => // we can safely get the value as we already checked the existence val containerThroughput = staleThreshold / duration - val num = ceiling(availableMsg.toDouble / containerThroughput) + val num = ceiling(staleActivationNum.toDouble / containerThroughput) // if it tries to create more containers than existing messages, we just create shortage - val actualNum = (if (num > availableMsg) availableMsg else num) - inProgress + val actualNum = (if (num > staleActivationNum) staleActivationNum else num) - inProgress addServersIfPossible( existing, inProgress, diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala index 72adc8be2..ce1c86682 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala @@ -683,6 +683,54 @@ class SchedulingDecisionMakerTests testProbe.expectMsg(DecisionResults(AddContainer, 2)) } + it should "add more containers when there are stale messages and non-stale messages and both message classes need more containers" in { + val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig)) + val testProbe = TestProbe() + + val msg = QueueSnapshot( + initialized = true, + incomingMsgCount = new AtomicInteger(0), + currentMsgCount = 5, + existingContainerCount = 2, + inProgressContainerCount = 0, + staleActivationNum = 2, + existingContainerCountInNamespace = 2, + inProgressContainerCountInNamespace = 0, + averageDuration = Some(1000), // the average duration exists + limit = 10, + stateName = Running, + recipient = testProbe.ref) + + decisionMaker ! msg + + //should add two for the stale messages and one to increase tps of non-stale available messages + testProbe.expectMsg(DecisionResults(AddContainer, 3)) + } + + it should "add more containers when there are stale messages and non-stale messages have needed tps" in { + val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig)) + val testProbe = TestProbe() + + val msg = QueueSnapshot( + initialized = true, + incomingMsgCount = new AtomicInteger(0), + currentMsgCount = 5, + existingContainerCount = 2, + inProgressContainerCount = 0, + staleActivationNum = 2, + existingContainerCountInNamespace = 2, + inProgressContainerCountInNamespace = 0, + averageDuration = Some(50), // the average duration gives container throughput of 2 + limit = 10, + stateName = Running, + recipient = testProbe.ref) + + decisionMaker ! msg + + //should add one additional container for stale messages and non-stale messages still meet tps + testProbe.expectMsg(DecisionResults(AddContainer, 1)) + } + it should "enable namespace throttling while adding more container when there are stale messages even in the GracefulShuttingDown" in { val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig)) val testProbe = TestProbe()