This is an automated email from the ASF dual-hosted git repository. seonghyun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 9005a083c Add fake clock for test code (#5304) 9005a083c is described below commit 9005a083cb58735031f36262bb93f580044cad9e Author: Seonghyun Oh <seonghyu...@gmail.com> AuthorDate: Tue Aug 9 11:04:44 2022 +0900 Add fake clock for test code (#5304) * Add fake clock for test code * Add test code for state timeout * Add test case for transaction _ => Flushing * Add StateTimeout test for Flushing state --- .../org/apache/openwhisk/common/time/Clock.scala | 28 +++ .../core/scheduler/queue/MemoryQueue.scala | 46 ++-- .../queue/test/MemoryQueueFlowTests.scala | 105 +++++++--- .../scheduler/queue/test/MemoryQueueTests.scala | 231 +++++++++++++++++---- 4 files changed, 324 insertions(+), 86 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/time/Clock.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/time/Clock.scala new file mode 100644 index 000000000..8acc2e135 --- /dev/null +++ b/common/scala/src/main/scala/org/apache/openwhisk/common/time/Clock.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.common.time + +import java.time.Instant + +trait Clock { + def now(): Instant +} + +object SystemClock extends Clock { + def now() = Instant.now() +} diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala index 312e9ecff..436b53d05 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala @@ -21,6 +21,7 @@ import akka.actor.Status.{Failure => FailureMessage} import akka.actor.{ActorRef, ActorSystem, Cancellable, FSM, Props, Stash} import akka.util.Timeout import org.apache.openwhisk.common._ +import org.apache.openwhisk.common.time.{Clock, SystemClock} import org.apache.openwhisk.core.ConfigKeys import org.apache.openwhisk.core.ack.ActiveAck import org.apache.openwhisk.core.connector.ContainerCreationError.ZeroNamespaceLimit @@ -121,13 +122,14 @@ class MemoryQueue(private val etcdClient: EtcdClient, ack: ActiveAck, store: (TransactionId, WhiskActivation, UserContext) => Future[Any], getUserLimit: String => Future[Int], - checkToDropStaleActivation: (Queue[TimeSeriesActivationEntry], + checkToDropStaleActivation: (Clock, + Queue[TimeSeriesActivationEntry], Long, String, WhiskActionMetaData, MemoryQueueState, ActorRef) => Unit, - queueConfig: QueueConfig)(implicit logging: Logging) + queueConfig: QueueConfig)(implicit logging: Logging, clock: Clock) extends FSM[MemoryQueueState, MemoryQueueData] with Stash { @@ -342,7 +344,7 @@ class MemoryQueue(private val etcdClient: EtcdClient, msg.transid) val whiskError = isWhiskError(data.error) if (whiskError) - queue = queue.enqueue(TimeSeriesActivationEntry(Instant.now, msg)) + queue = queue.enqueue(TimeSeriesActivationEntry(clock.now(), msg)) else completeErrorActivation(msg, data.reason, whiskError) stay() using data.copy(activeDuringFlush = true) @@ -351,8 +353,12 @@ class MemoryQueue(private val etcdClient: EtcdClient, // Instead, StateTimeout message will be sent by a timer. case Event(StateTimeout | DropOld, data: FlushingData) => logging.info(this, s"[$invocationNamespace:$action:$stateName] Received StateTimeout, drop stale messages.") - queue = - MemoryQueue.dropOld(queue, Duration.ofMillis(actionRetentionTimeout), data.reason, completeErrorActivation) + queue = MemoryQueue.dropOld( + clock, + queue, + Duration.ofMillis(actionRetentionTimeout), + data.reason, + completeErrorActivation) if (data.activeDuringFlush || queue.nonEmpty) stay using data.copy(activeDuringFlush = false) else @@ -540,7 +546,7 @@ class MemoryQueue(private val etcdClient: EtcdClient, case Event(DropOld, _) => if (queue.nonEmpty && Duration - .between(queue.head.timestamp, Instant.now) + .between(queue.head.timestamp, clock.now()) .compareTo(Duration.ofMillis(actionRetentionTimeout)) < 0) { logging.error( this, @@ -550,6 +556,7 @@ class MemoryQueue(private val etcdClient: EtcdClient, s"[$invocationNamespace:$action:$stateName] the head stale message: ${queue.head.msg.activationId}") } queue = MemoryQueue.dropOld( + clock, queue, Duration.ofMillis(actionRetentionTimeout), s"Activation processing is not initiated for $actionRetentionTimeout ms", @@ -706,6 +713,7 @@ class MemoryQueue(private val etcdClient: EtcdClient, NoData() } } + private def cleanUpWatcher(): Unit = { watchedKeys.foreach { key => watcherService ! UnwatchEndpoint(key, isPrefix = true, watcherName) @@ -883,7 +891,14 @@ class MemoryQueue(private val etcdClient: EtcdClient, // these schedulers will run forever and stop when the memory queue stops private def startMonitoring(): (ActorRef, ActorRef) = { val droppingScheduler = Scheduler.scheduleWaitAtLeast(schedulingConfig.dropInterval) { () => - checkToDropStaleActivation(queue, actionRetentionTimeout, invocationNamespace, actionMetaData, stateName, self) + checkToDropStaleActivation( + clock, + queue, + actionRetentionTimeout, + invocationNamespace, + actionMetaData, + stateName, + self) Future.successful(()) } @@ -930,7 +945,7 @@ class MemoryQueue(private val etcdClient: EtcdClient, @tailrec private def getStaleActivationNum(count: Int, queue: Queue[TimeSeriesActivationEntry]): Int = { if (queue.isEmpty || Duration - .between(queue.head.timestamp, Instant.now) + .between(queue.head.timestamp, clock.now()) .compareTo(StaleDuration) < 0) count else getStaleActivationNum(count + 1, queue.tail) @@ -988,7 +1003,7 @@ class MemoryQueue(private val etcdClient: EtcdClient, stay } .getOrElse { - queue = queue.enqueue(TimeSeriesActivationEntry(Instant.now, msg)) + queue = queue.enqueue(TimeSeriesActivationEntry(clock.now(), msg)) in.decrementAndGet() tryEnableActionThrottling() } @@ -1051,7 +1066,7 @@ class MemoryQueue(private val etcdClient: EtcdClient, /** Generates an activation with zero runtime. Usually used for error cases */ private def generateFallbackActivation(msg: ActivationMessage, response: ActivationResponse): WhiskActivation = { - val now = Instant.now + val now = clock.now() val causedBy = if (msg.causedBySequence) { Some(Parameters(WhiskActivation.causedByAnnotation, JsString(Exec.SEQUENCE))) } else None @@ -1101,6 +1116,7 @@ object MemoryQueue { ack: ActiveAck, store: (TransactionId, WhiskActivation, UserContext) => Future[Any], getUserLimit: String => Future[Int])(implicit logging: Logging): Props = { + implicit val clock: Clock = SystemClock Props( new MemoryQueue( etcdClient, @@ -1126,19 +1142,21 @@ object MemoryQueue { @tailrec def dropOld( + clock: Clock, queue: Queue[TimeSeriesActivationEntry], retention: Duration, reason: String, completeErrorActivation: (ActivationMessage, String, Boolean) => Future[Any]): Queue[TimeSeriesActivationEntry] = { - if (queue.isEmpty || Duration.between(queue.head.timestamp, Instant.now).compareTo(retention) < 0) + if (queue.isEmpty || Duration.between(queue.head.timestamp, clock.now()).compareTo(retention) < 0) queue else { completeErrorActivation(queue.head.msg, reason, true) - dropOld(queue.tail, retention, reason, completeErrorActivation) + dropOld(clock, queue.tail, retention, reason, completeErrorActivation) } } - def checkToDropStaleActivation(queue: Queue[TimeSeriesActivationEntry], + def checkToDropStaleActivation(clock: Clock, + queue: Queue[TimeSeriesActivationEntry], maxRetentionMs: Long, invocationNamespace: String, actionMetaData: WhiskActionMetaData, @@ -1150,7 +1168,7 @@ object MemoryQueue { s"[$invocationNamespace:$action:$stateName] use the given retention timeout: $maxRetentionMs for this action kind: ${actionMetaData.exec.kind}.") if (queue.nonEmpty && Duration - .between(queue.head.timestamp, Instant.now) + .between(queue.head.timestamp, clock.now()) .compareTo(Duration.ofMillis(maxRetentionMs)) >= 0) { logging.info( this, diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala index 52568327f..e68de5b58 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala @@ -22,6 +22,7 @@ import akka.actor.FSM.{CurrentState, StateTimeout, SubscribeTransitionCallBack, import akka.testkit.{TestActor, TestFSMRef, TestProbe} import com.sksamuel.elastic4s.http.{search => _} import org.apache.openwhisk.common.GracefulShutdown +import org.apache.openwhisk.common.time.{Clock, SystemClock} import org.apache.openwhisk.core.connector.ContainerCreationError.{NonExecutableActionError, WhiskError} import org.apache.openwhisk.core.connector.ContainerCreationMessage import org.apache.openwhisk.core.entity._ @@ -50,6 +51,17 @@ import scala.concurrent.Future import scala.concurrent.duration.{DurationInt, FiniteDuration, MILLISECONDS} import scala.language.postfixOps +class FakeClock extends Clock { + var instant: Instant = Instant.now() + def now() = instant + def set(now: Instant): Unit = { + instant = now + } + def plusSeconds(secondsToAdd: Long): Unit = { + instant = instant.plusSeconds(secondsToAdd) + } +} + @RunWith(classOf[JUnitRunner]) class MemoryQueueFlowTests extends MemoryQueueTestsFixture @@ -75,6 +87,7 @@ class MemoryQueueFlowTests behavior of "MemoryQueueFlow" it should "normally be created and handle an activation and became idle an finally removed" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val parent = TestProbe() val watcher = TestProbe() @@ -151,11 +164,11 @@ class MemoryQueueFlowTests container.send(fsm, getActivation(false)) container.expectMsg(ActivationResponse(Left(NoActivationMessage()))) - Thread.sleep(idleGrace.toMillis) + fsm ! StateTimeout probe.expectMsg(Transition(fsm, Running, Idle)) - Thread.sleep(stopGrace.toMillis) + fsm ! StateTimeout expectDataCleanUp(watcher, dataMgmtService) @@ -168,6 +181,7 @@ class MemoryQueueFlowTests } it should "became Idle and Running again if a message arrives" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val parent = TestProbe() val watcher = TestProbe() @@ -235,7 +249,7 @@ class MemoryQueueFlowTests container.send(fsm, getActivation(false)) container.expectMsg(ActivationResponse(Left(NoActivationMessage()))) - Thread.sleep(idleGrace.toMillis) + fsm ! StateTimeout probe.expectMsg(Transition(fsm, Running, Idle)) @@ -243,7 +257,7 @@ class MemoryQueueFlowTests probe.expectMsg(Transition(fsm, Idle, Running)) - Thread.sleep(idleGrace.toMillis) + fsm ! StateTimeout // since there is one message, it should not be Idle probe.expectNoMessage() @@ -264,11 +278,11 @@ class MemoryQueueFlowTests container.send(fsm, getActivation(false)) container.expectMsg(ActivationResponse(Left(NoActivationMessage()))) - Thread.sleep(idleGrace.toMillis) + fsm ! StateTimeout probe.expectMsg(Transition(fsm, Running, Idle)) - Thread.sleep(stopGrace.toMillis) + fsm ! StateTimeout parent.expectMsg(queueRemovedMsg) probe.expectMsg(Transition(fsm, Idle, Removed)) @@ -281,6 +295,7 @@ class MemoryQueueFlowTests } it should "go to the NamespaceThrottled state dropping messages when it can't create an initial container" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val parent = TestProbe() val watcher = TestProbe() @@ -367,6 +382,7 @@ class MemoryQueueFlowTests } it should "go to the NamespaceThrottled state without dropping messages and get back to the Running container" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val parent = TestProbe() val watcher = TestProbe() @@ -471,12 +487,12 @@ class MemoryQueueFlowTests container.send(fsm, getActivation(false, "testContainerId2")) container.expectMsg(ActivationResponse(Left(NoActivationMessage()))) - Thread.sleep(idleGrace.toMillis) + fsm ! StateTimeout // all subsequent procedures are same with the Running case probe.expectMsg(Transition(fsm, Running, Idle)) - Thread.sleep(stopGrace.toMillis) + fsm ! StateTimeout parent.expectMsg(queueRemovedMsg) probe.expectMsg(Transition(fsm, Idle, Removed)) @@ -489,6 +505,7 @@ class MemoryQueueFlowTests } it should "go to the ActionThrottled state when there are too many stale activations including transition to NamespaceThrottling" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val parent = TestProbe() val watcher = TestProbe() @@ -623,10 +640,10 @@ class MemoryQueueFlowTests probe.expectMsg(Transition(fsm, NamespaceThrottled, Running)) // normal termination process - Thread.sleep(idleGrace.toMillis) + fsm ! StateTimeout probe.expectMsg(Transition(fsm, Running, Idle)) - Thread.sleep(stopGrace.toMillis) + fsm ! StateTimeout parent.expectMsg(queueRemovedMsg) probe.expectMsg(Transition(fsm, Idle, Removed)) @@ -637,6 +654,7 @@ class MemoryQueueFlowTests } it should "be Flushing when the limit is 0 and restarted back to Running state when the limit is increased" in { + implicit val clock = new FakeClock val mockEtcdClient = mock[EtcdClient] val parent = TestProbe() val watcher = TestProbe() @@ -689,10 +707,12 @@ class MemoryQueueFlowTests expectInitialData(watcher, dataMgmtService) probe.expectMsg(Transition(fsm, Uninitialized, Running)) + clock.plusSeconds(FiniteDuration(retentionTimeout, MILLISECONDS).toSeconds) + probe.expectMsg(Transition(fsm, Running, Flushing)) // activation received in Flushing state won't be flushed immediately if Flushing state is caused by a whisk error - Thread.sleep(flushGrace.toMillis) fsm ! messages(1) + fsm ! StateTimeout awaitAssert({ ackedMessageCount shouldBe 1 @@ -700,7 +720,7 @@ class MemoryQueueFlowTests storedMessageCount shouldBe 1 lastAckedActivationResult.response.result shouldBe Some(JsObject("error" -> JsString(namespaceLimitUnderZero))) fsm.underlyingActor.queue.length shouldBe 1 - }, FiniteDuration(retentionTimeout, MILLISECONDS)) + }, 5.seconds) // limit is increased by an operator limit = 10 @@ -730,11 +750,11 @@ class MemoryQueueFlowTests fsm.underlyingActor.namespaceContainerCount.existingContainerNumByNamespace -= 1 // normal termination process - Thread.sleep(idleGrace.toMillis) + fsm ! StateTimeout probe.expectMsg(Transition(fsm, Running, Idle)) - Thread.sleep(stopGrace.toMillis) + fsm ! StateTimeout parent.expectMsg(queueRemovedMsg) probe.expectMsg(Transition(fsm, Idle, Removed)) @@ -747,6 +767,7 @@ class MemoryQueueFlowTests } it should "be Flushing when the limit is 0 and be terminated without recovering" in { + implicit val clock = new FakeClock val mockEtcdClient = mock[EtcdClient] val parent = TestProbe() val watcher = TestProbe() @@ -801,7 +822,8 @@ class MemoryQueueFlowTests fsm ! messages(1) // activation received in Flushing state won't be flushed immediately if Flushing state is caused by a whisk error - Thread.sleep(flushGrace.toMillis) + clock.plusSeconds((queueConfig.maxRetentionMs) / 1000) + fsm ! DropOld awaitAssert({ ackedMessageCount shouldBe 2 @@ -809,9 +831,10 @@ class MemoryQueueFlowTests storedMessageCount shouldBe 2 lastAckedActivationResult.response.result shouldBe Some(JsObject("error" -> JsString(namespaceLimitUnderZero))) fsm.underlyingActor.queue.length shouldBe 0 - }, FiniteDuration(retentionTimeout, MILLISECONDS)) + }, 5.seconds) // In this case data clean up happens first. + fsm ! StateTimeout expectDataCleanUp(watcher, dataMgmtService) probe.expectMsg(Transition(fsm, Flushing, Removed)) @@ -822,6 +845,7 @@ class MemoryQueueFlowTests } it should "be the Flushing state when a whisk error happens" in { + implicit val clock = new FakeClock val mockEtcdClient = mock[EtcdClient] val parent = TestProbe() val watcher = TestProbe() @@ -897,7 +921,8 @@ class MemoryQueueFlowTests fsm ! messages(1) - Thread.sleep(flushGrace.toMillis) + clock.plusSeconds(FiniteDuration(retentionTimeout, MILLISECONDS).toSeconds) + fsm ! StateTimeout awaitAssert({ ackedMessageCount shouldBe 2 @@ -907,7 +932,8 @@ class MemoryQueueFlowTests fsm.underlyingActor.queue.length shouldBe 0 }, FiniteDuration(retentionTimeout, MILLISECONDS)) - Thread.sleep(flushGrace.toMillis * 2) + clock.plusSeconds(flushGrace.toSeconds * 2) + fsm ! StateTimeout parent.expectMsg(queueRemovedMsg) probe.expectMsg(Transition(fsm, Flushing, Removed)) @@ -920,6 +946,7 @@ class MemoryQueueFlowTests } it should "be the Flushing state when a whisk error happens and be recovered when a container is created" in { + implicit val clock = new FakeClock val mockEtcdClient = mock[EtcdClient] val parent = TestProbe() val watcher = TestProbe() @@ -975,12 +1002,12 @@ class MemoryQueueFlowTests fsm ! FailedCreationJob(id, testInvocationNamespace, fqn, revision, WhiskError, "whisk error") } + clock.plusSeconds(FiniteDuration(retentionTimeout, MILLISECONDS).toSeconds) probe.expectMsg(Transition(fsm, Running, Flushing)) - Thread.sleep(1000) fsm ! messages(1) // activation received in Flushing state won't be flushed immediately if Flushing state is caused by a whisk error - Thread.sleep(flushGrace.toMillis) + fsm ! StateTimeout awaitAssert({ ackedMessageCount shouldBe 1 @@ -1005,11 +1032,11 @@ class MemoryQueueFlowTests container.send(fsm, getActivation(false)) container.expectMsg(ActivationResponse(Left(NoActivationMessage()))) - Thread.sleep(idleGrace.toMillis) - + fsm.underlyingActor.creationIds = Set.empty[String] + fsm ! StateTimeout probe.expectMsg(Transition(fsm, Running, Idle)) - Thread.sleep(stopGrace.toMillis) + fsm ! StateTimeout parent.expectMsg(queueRemovedMsg) probe.expectMsg(Transition(fsm, Idle, Removed)) @@ -1022,6 +1049,7 @@ class MemoryQueueFlowTests } it should "be the Flushing state when a developer error happens" in { + implicit val clock = new FakeClock val mockEtcdClient = mock[EtcdClient] val parent = TestProbe() val watcher = TestProbe() @@ -1113,7 +1141,10 @@ class MemoryQueueFlowTests fsm.underlyingActor.queue.length shouldBe 0 } - Thread.sleep(flushGrace.toMillis * 2) + // simulate timeout 2 times + clock.plusSeconds(flushGrace.toSeconds * 2) + fsm ! StateTimeout + fsm ! StateTimeout expectDataCleanUp(watcher, dataMgmtService) parent.expectMsg(queueRemovedMsg) @@ -1125,6 +1156,7 @@ class MemoryQueueFlowTests } it should "be gracefully terminated when it receives a GracefulShutDown message" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val parent = TestProbe() val watcher = TestProbe() @@ -1211,12 +1243,12 @@ class MemoryQueueFlowTests fsm ! QueueRemovedCompleted // if there is a message, it should not terminate - Thread.sleep(gracefulShutdownTimeout.toMillis) + fsm ! StateTimeout container.send(fsm, getActivation()) container.expectMsg(ActivationResponse(Right(messages(2)))) - Thread.sleep(gracefulShutdownTimeout.toMillis) + fsm ! StateTimeout // it doesn't need to check if all containers are timeout as same version of a new queue will be created in another scheduler. @@ -1235,6 +1267,7 @@ class MemoryQueueFlowTests val allStates = List(Running, Idle, Flushing, ActionThrottled, NamespaceThrottled, Removing, Removed) allStates.foreach { state => + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val parent = TestProbe() val watcher = TestProbe() @@ -1319,6 +1352,7 @@ class MemoryQueueFlowTests val allStates = List(Running, Idle, Flushing, ActionThrottled, NamespaceThrottled, Removing, Removed) allStates.foreach { state => + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val parent = TestProbe() val watcher = TestProbe() @@ -1393,7 +1427,7 @@ class MemoryQueueFlowTests probe.expectMsg(Transition(fsm, state, Removed)) // queue manager could not respond to the memory queue. - Thread.sleep(stopGrace.toMillis) + fsm ! StateTimeout // the queue is supposed to send queueRemovedMsg once again and stops itself. parent.expectMsg(queueRemovedMsg) @@ -1408,6 +1442,7 @@ class MemoryQueueFlowTests List(Running, Idle, ActionThrottled, NamespaceThrottled, Flushing, Removing, Removed) allStates.foreach { state => + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val parent = TestProbe() val watcher = TestProbe() @@ -1509,7 +1544,7 @@ class MemoryQueueFlowTests parent.expectMsg(message) // queue should be terminated after gracefulShutdownTimeout - Thread.sleep(gracefulShutdownTimeout.toMillis) + fsm ! StateTimeout // clean up actors only because etcd data is being used by a new queue watcher.expectMsgAllOf( @@ -1548,14 +1583,15 @@ class MemoryQueueFlowTests probe.expectMsg(Transition(fsm, state, Removed)) fsm ! QueueRemovedCompleted - - Thread.sleep(gracefulShutdownTimeout.toMillis) + fsm ! StateTimeout watcher.expectMsgAllOf( UnwatchEndpoint(inProgressContainerKey, isPrefix = true, watcherName), UnwatchEndpoint(existingContainerKey, isPrefix = true, watcherName), UnwatchEndpoint(leaderKey, isPrefix = false, watcherName)) + probe.expectTerminated(fsm, 10.seconds) + case _ => parent.expectMsg(staleQueueRemovedMsg) parent.expectMsg(message) @@ -1584,6 +1620,7 @@ class MemoryQueueFlowTests List(Running, Idle, ActionThrottled, NamespaceThrottled, Flushing, Removing, Removed) allStates.foreach { state => + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val parent = TestProbe() val watcher = TestProbe() @@ -1676,13 +1713,13 @@ class MemoryQueueFlowTests // queue will be gracefully shutdown. case Removing => // queue should not be terminated as there is an activation - Thread.sleep(gracefulShutdownTimeout.toMillis) + fsm ! StateTimeout container.send(fsm, getActivation()) container.expectMsg(ActivationResponse(Right(message))) // queue should not be terminated as there is an activation - Thread.sleep(gracefulShutdownTimeout.toMillis) + fsm ! StateTimeout // clean up actors only because etcd data is being used by a new queue watcher.expectMsgAllOf( @@ -1721,12 +1758,12 @@ class MemoryQueueFlowTests fsm ! QueueRemovedCompleted // queue should not be terminated as there is an activation - Thread.sleep(gracefulShutdownTimeout.toMillis) + fsm ! StateTimeout container.send(fsm, getActivation()) container.expectMsg(ActivationResponse(Right(message))) - Thread.sleep(gracefulShutdownTimeout.toMillis) + fsm ! StateTimeout watcher.expectMsgAllOf( UnwatchEndpoint(inProgressContainerKey, isPrefix = true, watcherName), diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala index fd2ac7363..e64262827 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala @@ -33,6 +33,7 @@ import com.ibm.etcd.client.kv.WatchUpdate import com.ibm.etcd.client.{EtcdClient => Client} import com.sksamuel.elastic4s.http.ElasticClient import common.StreamLogging +import org.apache.openwhisk.common.time.SystemClock import org.apache.openwhisk.common.{GracefulShutdown, TransactionId} import org.apache.openwhisk.core.ack.ActiveAck import org.apache.openwhisk.core.connector._ @@ -135,7 +136,131 @@ class MemoryQueueTests behavior of "MemoryQueue" + it should "send StateTimeout message when state timeout" in { + implicit val clock = SystemClock + val mockEtcdClient = mock[EtcdClient] + val prove = TestProbe() + val watcher = TestProbe() + val parent = TestProbe() + + expectDurationChecking(mockEsClient, testInvocationNamespace) + + val queueConfigWithShortTimeout = queueConfig.copy( + idleGrace = 10.milliseconds, + stopGrace = 10.milliseconds, + gracefulShutdownTimeout = 10.milliseconds) + + val fsm = + TestFSMRef( + new MemoryQueue( + mockEtcdClient, + durationChecker, + fqn, + mockMessaging(), + schedulingConfig, + testInvocationNamespace, + revision, + endpoints, + actionMetadata, + prove.ref, + watcher.ref, + TestProbe().ref, + TestProbe().ref, + schedulerId, + ack, + store, + getUserLimit, + checkToDropStaleActivation, + queueConfigWithShortTimeout), + parent.ref, + "MemoryQueue") + + registerCallback(fsm) + fsm ! Start + expectMsg(Transition(fsm, Uninitialized, Running)) + + // Test stateTimeout for when(Running, stateTimeout = queueConfig.idleGrace) + fsm.isStateTimerActive shouldBe true + Thread.sleep(queueConfigWithShortTimeout.idleGrace.toMillis) + + expectMsg(Transition(fsm, Running, Idle)) + + // Test stateTimeout for when(Idle, stateTimeout = queueConfig.stopGrace) + fsm.isStateTimerActive shouldBe true + Thread.sleep(queueConfigWithShortTimeout.stopGrace.toMillis) + expectMsg(Transition(fsm, Idle, Removed)) + + // Test stateTimeout for when(Removed, stateTimeout = queueConfig.gracefulShutdownTimeout) + fsm.isStateTimerActive shouldBe true + Thread.sleep(queueConfigWithShortTimeout.gracefulShutdownTimeout.toMillis) + parent.expectMsg(queueRemovedMsg) + } + + it should "start startTimerWithFixedDelay(name=StopQueue) on Transition _ => Flushing" in { + implicit val clock = SystemClock + val mockEtcdClient = mock[EtcdClient] + val prove = TestProbe() + val watcher = TestProbe() + val parent = TestProbe() + + expectDurationChecking(mockEsClient, testInvocationNamespace) + + val queueConfigWithShortTimeout = + queueConfig.copy( + idleGrace = 10.seconds, + stopGrace = 10.milliseconds, + gracefulShutdownTimeout = 10.milliseconds, + flushGrace = 10.milliseconds) + + val fsm = + TestFSMRef( + new MemoryQueue( + mockEtcdClient, + durationChecker, + fqn, + mockMessaging(), + schedulingConfig, + testInvocationNamespace, + revision, + endpoints, + actionMetadata, + prove.ref, + watcher.ref, + TestProbe().ref, + TestProbe().ref, + schedulerId, + ack, + store, + getUserLimit, + checkToDropStaleActivation, + queueConfigWithShortTimeout), + parent.ref, + "MemoryQueue") + + registerCallback(fsm) + fsm ! Start + expectMsg(Transition(fsm, Uninitialized, Running)) + + fsm ! FailedCreationJob( + testCreationId, + message.user.namespace.name.asString, + message.action, + message.revision, + ContainerCreationError.NoAvailableInvokersError, + "no available invokers") + + // Test case _ -> Flushing => startTimerWithFixedDelay("StopQueue", StateTimeout, queueConfig.flushGrace) + // state Running -> Flushing + expectMsg(Transition(fsm, Running, Flushing)) + fsm.isTimerActive("StopQueue") shouldBe true + + // wait for flushGrace time, StopQueue timer should send StateTimeout + Thread.sleep(queueConfigWithShortTimeout.flushGrace.toMillis) + expectMsg(Transition(fsm, Flushing, Removed)) + } + it should "register the endpoint when initializing" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val prove = TestProbe() val watcher = TestProbe() @@ -192,6 +317,7 @@ class MemoryQueueTests } it should "go to Flushing state if any error happens when the queue is depreacted" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val prove = TestProbe() val watcher = TestProbe() @@ -252,6 +378,7 @@ class MemoryQueueTests } it should "go to the Running state without storing any data if it receives VersionUpdated" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val prove = TestProbe() val watcher = TestProbe() @@ -308,6 +435,7 @@ class MemoryQueueTests } it should "remove the queue when timeout occurs" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val parent = TestProbe() val dataManagementService = TestProbe() @@ -380,6 +508,7 @@ class MemoryQueueTests } it should "back to Running state when got new ActivationMessage when in Idle State" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val prove = TestProbe() val watcher = TestProbe() @@ -459,6 +588,7 @@ class MemoryQueueTests } it should "back to Running state when got new ActivationMessage when in Removed State" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val prove = TestProbe() val watcher = TestProbe() @@ -530,6 +660,7 @@ class MemoryQueueTests } it should "store the received ActivationMessage in the queue" in { + implicit val clock = SystemClock val mockEtcdClient = new MockEtcdClient(client, isLeader = true) val prove = TestProbe() @@ -571,6 +702,7 @@ class MemoryQueueTests } it should "send a ActivationMessage in response to GetActivation" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val probe = TestProbe() val tid = TransactionId(TransactionId.generateTid()) @@ -611,6 +743,7 @@ class MemoryQueueTests } it should "send NoActivationMessage in case there is no message in the queue" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val probe = TestProbe() val tid = TransactionId(TransactionId.generateTid()) @@ -650,6 +783,7 @@ class MemoryQueueTests } it should "poll for the ActivationMessage in case there is no message in the queue" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val probe = TestProbe() val tid = TransactionId(TransactionId.generateTid()) @@ -698,6 +832,7 @@ class MemoryQueueTests } it should "not send msg to a deleted container" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val probe = TestProbe() val tid = TransactionId(TransactionId.generateTid()) @@ -752,6 +887,7 @@ class MemoryQueueTests } it should "send response to request according to the order of container id and warmed flag" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val probe = TestProbe() val tid = TransactionId(TransactionId.generateTid()) @@ -819,6 +955,7 @@ class MemoryQueueTests } it should "send a container creation request to ContainerManager at initialization time" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val containerManger = TestProbe() val probe = TestProbe() @@ -888,6 +1025,7 @@ class MemoryQueueTests } it should "complete error activation while received FailedCreationJob and the error is not a whisk error(unrecoverable)" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val testProbe = TestProbe() val parent = TestProbe() @@ -968,6 +1106,7 @@ class MemoryQueueTests } it should "complete error activation after timeout while received FailedCreationJob and the error is a whisk error(recoverable)" in { + implicit val clock = new FakeClock val mockEtcdClient = mock[EtcdClient] val testProbe = TestProbe() val decisionMaker = TestProbe() @@ -1024,11 +1163,15 @@ class MemoryQueueTests parent.expectNoMessage(5.seconds) // Add 3 more messages. + clock.plusSeconds(5) (1 to expectedCount).foreach(_ => fsm ! message) parent.expectNoMessage(5.seconds) // After 10 seconds(action retention timeout), the first 3 messages are timed out. // It does not get removed as there are still 3 messages in the queue. + clock.plusSeconds(5) + fsm ! DropOld + awaitAssert({ ackedMessageCount shouldBe 3 lastAckedActivationResult.response.result shouldBe Some(JsObject("error" -> JsString("no available invokers"))) @@ -1052,7 +1195,8 @@ class MemoryQueueTests parent.expectMsg(Transition(fsm, Running, Flushing)) // wait for the flush grace, and then all existing activations will be flushed - Thread.sleep(queueConfig.maxBlackboxRetentionMs + queueConfig.flushGrace.toMillis) + clock.plusSeconds((queueConfig.maxBlackboxRetentionMs + queueConfig.flushGrace.toMillis) / 1000) + fsm ! DropOld // The error message is updated from the recent error message of the FailedCreationJob. awaitAssert({ @@ -1072,6 +1216,7 @@ class MemoryQueueTests } it should "send old version activation to queueManager when update action if doesn't exist old version container" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val probe = TestProbe() val queueManager = TestProbe() @@ -1114,6 +1259,7 @@ class MemoryQueueTests } it should "fetch old version activation by old container when update action" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val probe = TestProbe() val queueManager = TestProbe() @@ -1159,6 +1305,7 @@ class MemoryQueueTests } it should "complete error activation after blackbox timeout when the action is a blackbox action and received FailedCreationJob with a whisk error(recoverable)" in { + implicit val clock = new FakeClock val mockEtcdClient = mock[EtcdClient] val testProbe = TestProbe() val decisionMaker = TestProbe() @@ -1246,7 +1393,8 @@ class MemoryQueueTests fsm ! message // wait for the flush grace, and then some existing activations will be flushed - Thread.sleep(queueConfig.maxBlackboxRetentionMs + queueConfig.flushGrace.toMillis) + clock.plusSeconds((queueConfig.maxBlackboxRetentionMs + queueConfig.flushGrace.toMillis) / 1000) + fsm ! DropOld (1 to expectedCount).foreach(_ => probe.expectMsg(ActivationResponse.whiskError("no available invokers"))) val duration = FiniteDuration(queueConfig.maxBlackboxRetentionMs, MILLISECONDS) + queueConfig.flushGrace @@ -1262,6 +1410,7 @@ class MemoryQueueTests } it should "stop scheduling if the namespace does not exist" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val getZeroLimit = (_: String) => { Future.failed(NoDocumentException("namespace does not exist")) } val testProbe = TestProbe() @@ -1302,10 +1451,11 @@ class MemoryQueueTests parent.expectMsg(10 seconds, CurrentState(fsm, Uninitialized)) parent.expectMsg(10 seconds, Transition(fsm, Uninitialized, Running)) - Thread.sleep(idleGrace.toMillis) - - parent expectMsg QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(revision), None) - parent.expectMsg(10 seconds, Transition(fsm, Running, Removing)) + fsm ! StopSchedulingAsOutdated + parent expectMsgAllOf (10 seconds, Transition(fsm, Running, Removing), QueueRemoved( + testInvocationNamespace, + fqn.toDocId.asDocInfo(revision), + None)) fsm ! QueueRemovedCompleted parent.expectMsg(10 seconds, Transition(fsm, Removing, Removed)) @@ -1314,6 +1464,7 @@ class MemoryQueueTests } it should "throttle the namespace when the limit is already reached" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val dataManagementService = TestProbe() val probe = TestProbe() @@ -1364,6 +1515,7 @@ class MemoryQueueTests } it should "disable namespace throttling when the capacity become available" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val dataManagementService = TestProbe() val probe = TestProbe() @@ -1425,6 +1577,7 @@ class MemoryQueueTests } it should "throttle the action when the number of messages reaches the limit" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val dataManagementService = TestProbe() val probe = TestProbe() @@ -1473,6 +1626,7 @@ class MemoryQueueTests } it should "disable action throttling when the number of messages is under throttling fraction" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val dataManagementService = TestProbe() val probe = TestProbe() @@ -1531,6 +1685,7 @@ class MemoryQueueTests } it should "update the number of containers based on Watch event" in { + implicit val clock = SystemClock val mockEtcdClient = new MockEtcdClient(client, true) val probe = TestProbe() val watcher = system.actorOf(WatcherService.props(mockEtcdClient)) @@ -1574,7 +1729,7 @@ class MemoryQueueTests val newRevision = DocRevision("2-testRev") memoryQueue.containers.size shouldBe 0 - memoryQueue.creationIds.size shouldBe 0 + memoryQueue.creationIds.count(_.startsWith("testId")) shouldBe 0 memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 0 memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 0 @@ -1611,13 +1766,12 @@ class MemoryQueueTests Some(ContainerId("test-containerId2"))), "test-value") - Thread.sleep(1000) - memoryQueue.containers.size shouldBe 1 - // the monit actor in memoryQueue may decide to create a container - memoryQueue.creationIds.size should be >= 1 - memoryQueue.creationIds.size should be <= 2 - memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 2 - memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 2 + awaitAssert({ + memoryQueue.containers.size shouldBe 1 // ['test-containerId1'] + memoryQueue.creationIds.count(_.startsWith("testId")) shouldBe 1 // ['testId1'] + memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 2 + memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 2 + }, 5.seconds) mockEtcdClient.publishEvents( EventType.PUT, @@ -1650,12 +1804,12 @@ class MemoryQueueTests Some(ContainerId("test-containerId4"))), "test-value") - Thread.sleep(1000) - memoryQueue.containers.size shouldBe 2 - memoryQueue.creationIds.size should be >= 2 - memoryQueue.creationIds.size should be <= 3 - memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 4 - memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 4 + awaitAssert({ + memoryQueue.containers.size shouldBe 2 // ['test-containerId1', 'test-containerId3'] + memoryQueue.creationIds.count(_.startsWith("testId")) shouldBe 2 // ['testId1', 'testId3'] + memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 4 + memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 4 + }, 5.seconds) mockEtcdClient.publishEvents( EventType.DELETE, @@ -1678,12 +1832,12 @@ class MemoryQueueTests inProgressContainer(testInvocationNamespace, newFqn, newRevision, schedulerId, CreationId("testId4")), "test-value") - Thread.sleep(1000) - memoryQueue.containers.size shouldBe 2 - memoryQueue.creationIds.size should be >= 0 - memoryQueue.creationIds.size should be <= 1 - memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 0 - memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 4 + awaitAssert({ + memoryQueue.containers.size shouldBe 2 // ['test-containerId1', 'test-containerId3'] + memoryQueue.creationIds.count(_.startsWith("testId")) shouldBe 0 + memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 0 + memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 4 + }, 5.seconds) mockEtcdClient.publishEvents( EventType.DELETE, @@ -1726,15 +1880,12 @@ class MemoryQueueTests Some(ContainerId("test-containerId4"))), "test-value") - memoryQueue.creationIds.size should be >= 0 - memoryQueue.creationIds.size should be <= 1 - - Thread.sleep(1000) - memoryQueue.containers.size shouldBe 0 - memoryQueue.creationIds.size should be >= 1 // if there is no container, the queue tries to create one container - memoryQueue.creationIds.size should be <= 2 - memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 0 - memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 0 + awaitAssert({ + memoryQueue.containers.size shouldBe 0 + memoryQueue.creationIds.count(_.startsWith("testId")) shouldBe 0 + memoryQueue.namespaceContainerCount.inProgressContainerNumByNamespace shouldBe 0 + memoryQueue.namespaceContainerCount.existingContainerNumByNamespace shouldBe 0 + }, 5.seconds) } private def getData(states: List[MemoryQueueState]) = { @@ -1749,6 +1900,7 @@ class MemoryQueueTests (schedulingActors, droppingActors, data) } it should "clean up throttling data when it stops gracefully" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val dataManagementService = TestProbe() @@ -1815,7 +1967,8 @@ class MemoryQueueTests it should "drop the old activation from the queue" in { var queue = Queue.empty[TimeSeriesActivationEntry] - val now = Instant.now + val clock = new FakeClock + val now = clock.now() val records = List( TimeSeriesActivationEntry(Instant.ofEpochMilli(now.toEpochMilli + 1000), message), TimeSeriesActivationEntry(Instant.ofEpochMilli(now.toEpochMilli + 2000), message), @@ -1826,10 +1979,10 @@ class MemoryQueueTests ) records.foreach(record => queue = queue.enqueue(record)) - - Thread.sleep(5000) + clock.plusSeconds(5) queue = MemoryQueue.dropOld( + clock, queue, java.time.Duration.ofMillis(1000), "activation processing is not initiated for 1000 ms", @@ -1843,6 +1996,7 @@ class MemoryQueueTests noException should be thrownBy { queue = MemoryQueue.dropOld( + SystemClock, queue, java.time.Duration.ofMillis(1000), "activation processing is not initiated for 1000 ms", @@ -1853,6 +2007,7 @@ class MemoryQueueTests behavior of "duration checker" it should "check the duration once" in { + implicit val clock = SystemClock val mockEtcdClient = mock[EtcdClient] val dataManagementService = TestProbe()