This is an automated email from the ASF dual-hosted git repository. bdoyle pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 20a7b1c6a add config to fail async scheduler throttles as whisk errors (#5305) 20a7b1c6a is described below commit 20a7b1c6ad3c57b583484c1fc12a069da42ad847 Author: Brendan Doyle <bdoyle0...@gmail.com> AuthorDate: Thu Sep 1 14:08:16 2022 -0700 add config to fail async scheduler throttles as whisk errors (#5305) * add config to fail async scheduler throttles as whisk errors * fix tests Co-authored-by: Brendan Doyle <brend...@qualtrics.com> --- core/scheduler/src/main/resources/application.conf | 1 + .../apache/openwhisk/core/scheduler/queue/MemoryQueue.scala | 11 ++++++----- .../core/scheduler/queue/test/MemoryQueueFlowTests.scala | 2 +- .../core/scheduler/queue/test/MemoryQueueTests.scala | 8 ++++---- .../core/scheduler/queue/test/MemoryQueueTestsFixture.scala | 2 +- 5 files changed, 13 insertions(+), 11 deletions(-) diff --git a/core/scheduler/src/main/resources/application.conf b/core/scheduler/src/main/resources/application.conf index e73f764c1..23ca734bb 100644 --- a/core/scheduler/src/main/resources/application.conf +++ b/core/scheduler/src/main/resources/application.conf @@ -79,6 +79,7 @@ whisk { max-blackbox-retention-ms = "300000" throttling-fraction = "0.9" duration-buffer-size = "10" + fail-throttle-as-whisk-error = "false" } queue-manager { max-scheduling-time = "20 seconds" diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala index 436b53d05..45fa2d3a2 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala @@ -227,7 +227,7 @@ class MemoryQueue(private val etcdClient: EtcdClient, enableNamespaceThrottling() if (dropMsg) - completeAllActivations(tooManyConcurrentRequests, isWhiskError = false) + completeAllActivations(tooManyConcurrentRequests, isWhiskError = queueConfig.failThrottleAsWhiskError) goto(NamespaceThrottled) using ThrottledData(data.schedulerActor, data.droppingActor) case Event(StateTimeout, data: RunningData) => @@ -269,7 +269,7 @@ class MemoryQueue(private val etcdClient: EtcdClient, when(NamespaceThrottled) { case Event(msg: ActivationMessage, _: ThrottledData) => if (containers.size + creationIds.size == 0) { - completeErrorActivation(msg, tooManyConcurrentRequests, isWhiskError = false) + completeErrorActivation(msg, tooManyConcurrentRequests, isWhiskError = queueConfig.failThrottleAsWhiskError) } else { handleActivationMessage(msg) } @@ -285,7 +285,7 @@ class MemoryQueue(private val etcdClient: EtcdClient, when(ActionThrottled) { // since there are already too many activation messages, it drops the new messages case Event(msg: ActivationMessage, ThrottledData(_, _)) => - completeErrorActivation(msg, tooManyConcurrentRequests, isWhiskError = false) + completeErrorActivation(msg, tooManyConcurrentRequests, isWhiskError = queueConfig.failThrottleAsWhiskError) stay } @@ -823,7 +823,7 @@ class MemoryQueue(private val etcdClient: EtcdClient, CompletionMessage(activation.transid, activationResponse, instance) } - if (!isWhiskError && message == tooManyConcurrentRequests) { + if (message == tooManyConcurrentRequests) { val metric = Metric("ConcurrentRateLimit", 1) UserEvents.send( messagingProducer, @@ -1208,7 +1208,8 @@ case class QueueConfig(idleGrace: FiniteDuration, maxRetentionMs: Long, maxBlackboxRetentionMs: Long, throttlingFraction: Double, - durationBufferSize: Int) + durationBufferSize: Int, + failThrottleAsWhiskError: Boolean) case class BufferedRequest(containerId: String, promise: Promise[Either[MemoryQueueError, ActivationMessage]]) case object DropOld diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala index e68de5b58..1cb2a5cda 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala @@ -517,7 +517,7 @@ class MemoryQueueFlowTests // max retention size is 10 and throttling fraction is 0.8 // queue will be action throttled at 10 messages and disabled action throttling at 8 messages - val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 seconds, 10, 5000, 10000, 0.8, 10) + val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 seconds, 10, 5000, 10000, 0.8, 10, false) // limit is 1 val getUserLimit = (_: String) => Future.successful(1) diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala index e64262827..711847269 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala @@ -1116,7 +1116,7 @@ class MemoryQueueTests expectDurationChecking(mockEsClient, testInvocationNamespace) - val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 seconds, 10, 10000, 20000, 0.9, 10) + val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 seconds, 10, 10000, 20000, 0.9, 10, false) val fsm = TestFSMRef( @@ -1342,7 +1342,7 @@ class MemoryQueueTests expectDurationChecking(mockEsClient, testInvocationNamespace) - val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 seconds, 10, 10000, 20000, 0.9, 10) + val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 seconds, 10, 10000, 20000, 0.9, 10, false) val fsm = TestFSMRef( @@ -1585,7 +1585,7 @@ class MemoryQueueTests // it always induces the throttling val getZeroLimit = (_: String) => { Future.successful(2) } - val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 seconds, 1, 5000, 10000, 0.9, 10) + val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 seconds, 1, 5000, 10000, 0.9, 10, false) expectDurationChecking(mockEsClient, testInvocationNamespace) @@ -1632,7 +1632,7 @@ class MemoryQueueTests val probe = TestProbe() val parent = TestProbe() - val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 seconds, 10, 5000, 10000, 0.9, 10) + val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 seconds, 10, 5000, 10000, 0.9, 10, false) val msgRetentionSize = queueConfig.maxRetentionSize val tid = TransactionId(TransactionId.generateTid()) diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala index c7a3ff095..090ce3d6e 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala @@ -154,7 +154,7 @@ class MemoryQueueTestsFixture val actionThrottlingKey = ThrottlingKeys.action(testInvocationNamespace, fqn.copy(version = None)) // queue variables - val queueConfig = QueueConfig(5 seconds, 10 seconds, 5 seconds, 5 seconds, 10, 10000, 20000, 0.9, 10) + val queueConfig = QueueConfig(5 seconds, 10 seconds, 5 seconds, 5 seconds, 10, 10000, 20000, 0.9, 10, false) val idleGrace = queueConfig.idleGrace val stopGrace = queueConfig.stopGrace val flushGrace = queueConfig.flushGrace