This is an automated email from the ASF dual-hosted git repository.

bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 20a7b1c6a add config to fail async scheduler throttles as whisk errors 
(#5305)
20a7b1c6a is described below

commit 20a7b1c6ad3c57b583484c1fc12a069da42ad847
Author: Brendan Doyle <bdoyle0...@gmail.com>
AuthorDate: Thu Sep 1 14:08:16 2022 -0700

    add config to fail async scheduler throttles as whisk errors (#5305)
    
    * add config to fail async scheduler throttles as whisk errors
    
    * fix tests
    
    Co-authored-by: Brendan Doyle <brend...@qualtrics.com>
---
 core/scheduler/src/main/resources/application.conf            |  1 +
 .../apache/openwhisk/core/scheduler/queue/MemoryQueue.scala   | 11 ++++++-----
 .../core/scheduler/queue/test/MemoryQueueFlowTests.scala      |  2 +-
 .../core/scheduler/queue/test/MemoryQueueTests.scala          |  8 ++++----
 .../core/scheduler/queue/test/MemoryQueueTestsFixture.scala   |  2 +-
 5 files changed, 13 insertions(+), 11 deletions(-)

diff --git a/core/scheduler/src/main/resources/application.conf 
b/core/scheduler/src/main/resources/application.conf
index e73f764c1..23ca734bb 100644
--- a/core/scheduler/src/main/resources/application.conf
+++ b/core/scheduler/src/main/resources/application.conf
@@ -79,6 +79,7 @@ whisk {
       max-blackbox-retention-ms = "300000"
       throttling-fraction = "0.9"
       duration-buffer-size = "10"
+      fail-throttle-as-whisk-error = "false"
     }
     queue-manager {
       max-scheduling-time = "20 seconds"
diff --git 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
index 436b53d05..45fa2d3a2 100644
--- 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
+++ 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
@@ -227,7 +227,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
       enableNamespaceThrottling()
 
       if (dropMsg)
-        completeAllActivations(tooManyConcurrentRequests, isWhiskError = false)
+        completeAllActivations(tooManyConcurrentRequests, isWhiskError = 
queueConfig.failThrottleAsWhiskError)
       goto(NamespaceThrottled) using ThrottledData(data.schedulerActor, 
data.droppingActor)
 
     case Event(StateTimeout, data: RunningData) =>
@@ -269,7 +269,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
   when(NamespaceThrottled) {
     case Event(msg: ActivationMessage, _: ThrottledData) =>
       if (containers.size + creationIds.size == 0) {
-        completeErrorActivation(msg, tooManyConcurrentRequests, isWhiskError = 
false)
+        completeErrorActivation(msg, tooManyConcurrentRequests, isWhiskError = 
queueConfig.failThrottleAsWhiskError)
       } else {
         handleActivationMessage(msg)
       }
@@ -285,7 +285,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
   when(ActionThrottled) {
     // since there are already too many activation messages, it drops the new 
messages
     case Event(msg: ActivationMessage, ThrottledData(_, _)) =>
-      completeErrorActivation(msg, tooManyConcurrentRequests, isWhiskError = 
false)
+      completeErrorActivation(msg, tooManyConcurrentRequests, isWhiskError = 
queueConfig.failThrottleAsWhiskError)
       stay
   }
 
@@ -823,7 +823,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
       CompletionMessage(activation.transid, activationResponse, instance)
     }
 
-    if (!isWhiskError && message == tooManyConcurrentRequests) {
+    if (message == tooManyConcurrentRequests) {
       val metric = Metric("ConcurrentRateLimit", 1)
       UserEvents.send(
         messagingProducer,
@@ -1208,7 +1208,8 @@ case class QueueConfig(idleGrace: FiniteDuration,
                        maxRetentionMs: Long,
                        maxBlackboxRetentionMs: Long,
                        throttlingFraction: Double,
-                       durationBufferSize: Int)
+                       durationBufferSize: Int,
+                       failThrottleAsWhiskError: Boolean)
 
 case class BufferedRequest(containerId: String, promise: 
Promise[Either[MemoryQueueError, ActivationMessage]])
 case object DropOld
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
index e68de5b58..1cb2a5cda 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
@@ -517,7 +517,7 @@ class MemoryQueueFlowTests
 
     // max retention size is 10 and throttling fraction is 0.8
     // queue will be action throttled at 10 messages and disabled action 
throttling at 8 messages
-    val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 
seconds, 10, 5000, 10000, 0.8, 10)
+    val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 
seconds, 10, 5000, 10000, 0.8, 10, false)
 
     // limit is 1
     val getUserLimit = (_: String) => Future.successful(1)
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
index e64262827..711847269 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
@@ -1116,7 +1116,7 @@ class MemoryQueueTests
 
     expectDurationChecking(mockEsClient, testInvocationNamespace)
 
-    val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 
seconds, 10, 10000, 20000, 0.9, 10)
+    val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 
seconds, 10, 10000, 20000, 0.9, 10, false)
 
     val fsm =
       TestFSMRef(
@@ -1342,7 +1342,7 @@ class MemoryQueueTests
 
     expectDurationChecking(mockEsClient, testInvocationNamespace)
 
-    val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 
seconds, 10, 10000, 20000, 0.9, 10)
+    val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 
seconds, 10, 10000, 20000, 0.9, 10, false)
 
     val fsm =
       TestFSMRef(
@@ -1585,7 +1585,7 @@ class MemoryQueueTests
     // it always induces the throttling
     val getZeroLimit = (_: String) => { Future.successful(2) }
 
-    val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 
seconds, 1, 5000, 10000, 0.9, 10)
+    val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 
seconds, 1, 5000, 10000, 0.9, 10, false)
 
     expectDurationChecking(mockEsClient, testInvocationNamespace)
 
@@ -1632,7 +1632,7 @@ class MemoryQueueTests
     val probe = TestProbe()
     val parent = TestProbe()
 
-    val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 
seconds, 10, 5000, 10000, 0.9, 10)
+    val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 
seconds, 10, 5000, 10000, 0.9, 10, false)
     val msgRetentionSize = queueConfig.maxRetentionSize
 
     val tid = TransactionId(TransactionId.generateTid())
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
index c7a3ff095..090ce3d6e 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
@@ -154,7 +154,7 @@ class MemoryQueueTestsFixture
   val actionThrottlingKey = ThrottlingKeys.action(testInvocationNamespace, 
fqn.copy(version = None))
 
   // queue variables
-  val queueConfig = QueueConfig(5 seconds, 10 seconds, 5 seconds, 5 seconds, 
10, 10000, 20000, 0.9, 10)
+  val queueConfig = QueueConfig(5 seconds, 10 seconds, 5 seconds, 5 seconds, 
10, 10000, 20000, 0.9, 10, false)
   val idleGrace = queueConfig.idleGrace
   val stopGrace = queueConfig.stopGrace
   val flushGrace = queueConfig.flushGrace

Reply via email to