This is an automated email from the ASF dual-hosted git repository. rabbah pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 2c33e81 Replace parallel collections with future based concurrency in tests. (#4841) 2c33e81 is described below commit 2c33e81df3e623a77afa17afdf3590f06a8d3b9d Author: Markus Thömmes <markusthoem...@me.com> AuthorDate: Thu Feb 27 14:02:51 2020 +0100 Replace parallel collections with future based concurrency in tests. (#4841) Scala 2.13 puts parallel collections into a separate module that's not compatible with Scala 2.12. To avoid having to work around things and to keep cross-compilation compatibility this just exchanges the approach for concurrency in tests to not use parallel collections at all. --- .../src/test/scala/common/ConcurrencyHelpers.scala | 30 ++++++++++ tests/src/test/scala/limits/ThrottleTests.scala | 19 +++--- .../openwhisk/common/ForcibleSemaphoreTests.scala | 11 +++- .../openwhisk/common/NestedSemaphoreTests.scala | 25 +++++--- .../openwhisk/common/ResizableSemaphoreTests.scala | 17 ++++-- .../core/controller/test/ActionsApiTests.scala | 2 +- .../core/limits/MaxActionDurationTests.scala | 70 ++++++++++------------ .../test/ShardingContainerPoolBalancerTests.scala | 6 +- 8 files changed, 115 insertions(+), 65 deletions(-) diff --git a/tests/src/test/scala/common/ConcurrencyHelpers.scala b/tests/src/test/scala/common/ConcurrencyHelpers.scala new file mode 100644 index 0000000..39fb1f5 --- /dev/null +++ b/tests/src/test/scala/common/ConcurrencyHelpers.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package common + +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext, Future} + +trait ConcurrencyHelpers { + def concurrently[T](times: Int, timeout: FiniteDuration)(op: => T)(implicit ec: ExecutionContext): Iterable[T] = + Await.result(Future.sequence((1 to times).map(_ => Future(op))), timeout) + + def concurrently[B, T](over: Iterable[B], timeout: FiniteDuration)(op: B => T)( + implicit ec: ExecutionContext): Iterable[T] = + Await.result(Future.sequence(over.map(v => Future(op(v)))), timeout) +} diff --git a/tests/src/test/scala/limits/ThrottleTests.scala b/tests/src/test/scala/limits/ThrottleTests.scala index ee67e3c..7b9e00f 100644 --- a/tests/src/test/scala/limits/ThrottleTests.scala +++ b/tests/src/test/scala/limits/ThrottleTests.scala @@ -21,9 +21,7 @@ import java.time.Instant import akka.http.scaladsl.model.StatusCodes.TooManyRequests -import scala.collection.parallel.immutable.ParSeq -import scala.concurrent.Future -import scala.concurrent.Promise +import scala.concurrent.{Await, Future, Promise} import scala.concurrent.duration._ import org.junit.runner.RunWith import org.scalatest.BeforeAndAfterAll @@ -102,10 +100,15 @@ class ThrottleTests * * @param results the sequence of results from invocations or firings */ - def waitForActivations(results: ParSeq[RunResult]) = results.foreach { result => - if (result.exitCode == SUCCESS_EXIT) { - withActivation(wsk.activation, result, totalWait = 5.minutes)(identity) + def waitForActivations(results: Seq[RunResult]) = { + val done = results.map { result => + if (result.exitCode == SUCCESS_EXIT) { + Future(withActivation(wsk.activation, result, totalWait = 5.minutes)(_ => ())) + } else { + Future.successful(()) + } } + Await.result(Future.sequence(done), 5.minutes) } /** @@ -201,7 +204,7 @@ class ThrottleTests // wait for the activations last, if these fail, the throttle should be settled // and this gives the activations time to complete and may avoid unnecessarily polling println("waiting for activations to complete") - waitForActivations(results.par) + waitForActivations(results) } it should "throttle multiple activations of one trigger" in withAssetCleaner(wskprops) { (wp, assetHelper) => @@ -286,7 +289,7 @@ class ThrottleTests // wait for the activations last, giving the activations time to complete and // may avoid unnecessarily polling; if these fail, the throttle may not be settled println("waiting for activations to complete") - waitForActivations(combinedResults.par) + waitForActivations(combinedResults) } } diff --git a/tests/src/test/scala/org/apache/openwhisk/common/ForcibleSemaphoreTests.scala b/tests/src/test/scala/org/apache/openwhisk/common/ForcibleSemaphoreTests.scala index cb4939d..756521f 100644 --- a/tests/src/test/scala/org/apache/openwhisk/common/ForcibleSemaphoreTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/common/ForcibleSemaphoreTests.scala @@ -17,12 +17,19 @@ package org.apache.openwhisk.common +import common.ConcurrencyHelpers +import org.apache.openwhisk.utils.ExecutionContextFactory import org.junit.runner.RunWith import org.scalatest.{FlatSpec, Matchers} import org.scalatest.junit.JUnitRunner +import scala.concurrent.duration.DurationInt + @RunWith(classOf[JUnitRunner]) -class ForcibleSemaphoreTests extends FlatSpec with Matchers { +class ForcibleSemaphoreTests extends FlatSpec with Matchers with ConcurrencyHelpers { + // use an infinite thread pool to allow for maximum concurrency + implicit val executionContext = ExecutionContextFactory.makeCachedThreadPoolExecutionContext() + behavior of "ForcableSemaphore" it should "not allow to acquire, force or release negative amounts of permits" in { @@ -79,7 +86,7 @@ class ForcibleSemaphoreTests extends FlatSpec with Matchers { (0 until 100).foreach { _ => val s = new ForcibleSemaphore(32) // try to acquire more permits than allowed in parallel - val acquires = (0 until 64).par.map(_ => s.tryAcquire()).seq + val acquires = concurrently(64, 1.minute)(s.tryAcquire()) val result = Seq.fill(32)(true) ++ Seq.fill(32)(false) acquires should contain theSameElementsAs result diff --git a/tests/src/test/scala/org/apache/openwhisk/common/NestedSemaphoreTests.scala b/tests/src/test/scala/org/apache/openwhisk/common/NestedSemaphoreTests.scala index e87c734..b421d03 100644 --- a/tests/src/test/scala/org/apache/openwhisk/common/NestedSemaphoreTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/common/NestedSemaphoreTests.scala @@ -17,13 +17,21 @@ package org.apache.openwhisk.common +import common.ConcurrencyHelpers +import org.apache.openwhisk.utils.ExecutionContextFactory import org.junit.runner.RunWith import org.scalatest.FlatSpec import org.scalatest.Matchers import org.scalatest.junit.JUnitRunner +import scala.concurrent.duration.DurationInt + @RunWith(classOf[JUnitRunner]) -class NestedSemaphoreTests extends FlatSpec with Matchers { +class NestedSemaphoreTests extends FlatSpec with Matchers with ConcurrencyHelpers { + // use an infinite thread pool to allow for maximum concurrency + implicit val executionContext = ExecutionContextFactory.makeCachedThreadPoolExecutionContext() + val acquireTimeout = 1.minute + behavior of "NestedSemaphore" it should "allow acquire of concurrency permits before acquire of memory permits" in { @@ -34,16 +42,17 @@ class NestedSemaphoreTests extends FlatSpec with Matchers { val actionConcurrency = 5 val actionMemory = 3 //use all concurrency on a single slot - (1 to 5).par.map { i => - s.tryAcquireConcurrent(actionId, actionConcurrency, actionMemory) shouldBe true - } + concurrently(5, acquireTimeout) { + s.tryAcquireConcurrent(actionId, actionConcurrency, actionMemory) + } should contain only true s.availablePermits shouldBe 20 - 3 //we used a single container (memory == 3) s.concurrentState(actionId).availablePermits shouldBe 0 //use up all the remaining memory (17) and concurrency slots (17 / 3 * 5 = 25) - (1 to 25).par.map { i => - s.tryAcquireConcurrent(actionId, actionConcurrency, actionMemory) shouldBe true - } + concurrently(25, acquireTimeout) { + s.tryAcquireConcurrent(actionId, actionConcurrency, actionMemory) + } should contain only true + s.availablePermits shouldBe 2 //we used 18 (20/3 = 6, 6*3=18) s.concurrentState(actionId).availablePermits shouldBe 0 s.tryAcquireConcurrent("action1", actionConcurrency, actionMemory) shouldBe false @@ -55,7 +64,7 @@ class NestedSemaphoreTests extends FlatSpec with Matchers { (0 until 100).foreach { _ => val s = new NestedSemaphore(32) // try to acquire more permits than allowed in parallel - val acquires = (0 until 64).par.map(_ => s.tryAcquire()).seq + val acquires = concurrently(64, acquireTimeout)(s.tryAcquire()) val result = Seq.fill(32)(true) ++ Seq.fill(32)(false) acquires should contain theSameElementsAs result diff --git a/tests/src/test/scala/org/apache/openwhisk/common/ResizableSemaphoreTests.scala b/tests/src/test/scala/org/apache/openwhisk/common/ResizableSemaphoreTests.scala index 19048c4..4115c0e 100644 --- a/tests/src/test/scala/org/apache/openwhisk/common/ResizableSemaphoreTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/common/ResizableSemaphoreTests.scala @@ -17,13 +17,21 @@ package org.apache.openwhisk.common +import common.ConcurrencyHelpers +import org.apache.openwhisk.utils.ExecutionContextFactory import org.junit.runner.RunWith import org.scalatest.FlatSpec import org.scalatest.Matchers import org.scalatest.junit.JUnitRunner +import scala.concurrent.duration.DurationInt + @RunWith(classOf[JUnitRunner]) -class ResizableSemaphoreTests extends FlatSpec with Matchers { +class ResizableSemaphoreTests extends FlatSpec with Matchers with ConcurrencyHelpers { + // use an infinite thread pool to allow for maximum concurrency + implicit val executionContext = ExecutionContextFactory.makeCachedThreadPoolExecutionContext() + val acquireTimeout = 1.minute + behavior of "ResizableSemaphore" it should "not allow to acquire, force or release negative amounts of permits" in { @@ -163,7 +171,7 @@ class ResizableSemaphoreTests extends FlatSpec with Matchers { (0 until 100).foreach { _ => val s = new ResizableSemaphore(32, 35) // try to acquire more permits than allowed in parallel - val acquires = (0 until 64).par.map(_ => s.tryAcquire()).seq + val acquires = concurrently(64, acquireTimeout)(s.tryAcquire()) val result = Seq.fill(32)(true) ++ Seq.fill(32)(false) acquires should contain theSameElementsAs result @@ -173,11 +181,10 @@ class ResizableSemaphoreTests extends FlatSpec with Matchers { it should "release permits even under concurrent load" in { val s = new ResizableSemaphore(32, 35) // try to acquire more permits than allowed in parallel - val acquires = (0 until 64).par.map(_ => s.tryAcquire()).seq + concurrently(64, acquireTimeout)(s.tryAcquire()) + concurrently(32, acquireTimeout)(s.release(1, true)) - (0 until 32).par.map(_ => s.release(1, true)) s.counter shouldBe 0 - } } diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala index 9a6075e..9a1a96e 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala @@ -1161,7 +1161,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi { action.publish, action.annotations ++ systemAnnotations(kind)) - (0 until 5).par.map { i => + (0 until 5).map { i => Get(s"$collectionPath/$name") ~> Route.seal(routes(creds)(transid())) ~> check { status should be(OK) val response = responseAs[WhiskAction] diff --git a/tests/src/test/scala/org/apache/openwhisk/core/limits/MaxActionDurationTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/limits/MaxActionDurationTests.scala index 4995b0c..167bcc3 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/limits/MaxActionDurationTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/limits/MaxActionDurationTests.scala @@ -18,18 +18,12 @@ package org.apache.openwhisk.core.limits import java.io.File -import scala.concurrent.duration.DurationInt +import scala.concurrent.duration.DurationInt import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner - -import common.TestHelpers -import common.TestUtils +import common.{ConcurrencyHelpers, TestHelpers, TestUtils, WskActorSystem, WskProps, WskTestHelpers} import common.rest.WskRestOperations -import common.WskProps -import common.WskTestHelpers -import common.WskActorSystem - import org.apache.openwhisk.core.entity._ import spray.json.DefaultJsonProtocol._ import spray.json._ @@ -41,7 +35,7 @@ import org.scalatest.tagobjects.Slow * Tests for action duration limits. These tests require a deployed backend. */ @RunWith(classOf[JUnitRunner]) -class MaxActionDurationTests extends TestHelpers with WskTestHelpers with WskActorSystem { +class MaxActionDurationTests extends TestHelpers with WskTestHelpers with WskActorSystem with ConcurrencyHelpers { implicit val wskprops = WskProps() val wsk = new WskRestOperations @@ -65,41 +59,41 @@ class MaxActionDurationTests extends TestHelpers with WskTestHelpers with WskAct "node-, python, and java-action" should s"run up to the max allowed duration (${TimeLimit.MAX_DURATION})" taggedAs (Slow) in withAssetCleaner( wskprops) { (wp, assetHelper) => // When you add more runtimes, keep in mind, how many actions can be processed in parallel by the Invokers! - Map("node" -> "helloDeadline.js", "python" -> "sleep.py", "java" -> "sleep.jar") + val runtimes = Map("node" -> "helloDeadline.js", "python" -> "sleep.py", "java" -> "sleep.jar") .filter { case (_, name) => new File(TestUtils.getTestActionFilename(name)).exists() } - .par - .map { - case (k, name) => - println(s"Testing action kind '${k}' with action '${name}'") - assetHelper.withCleaner(wsk.action, name) { (action, _) => - val main = if (k == "java") Some("Sleep") else None - action.create( - name, - Some(TestUtils.getTestActionFilename(name)), - timeout = Some(TimeLimit.MAX_DURATION), - main = main) - } - val run = wsk.action.invoke( + concurrently(runtimes.toSeq, TimeLimit.MAX_DURATION + 2.minutes) { + case (k, name) => + println(s"Testing action kind '${k}' with action '${name}'") + assetHelper.withCleaner(wsk.action, name) { (action, _) => + val main = if (k == "java") Some("Sleep") else None + action.create( name, - Map("forceHang" -> true.toJson, "sleepTimeInMs" -> (TimeLimit.MAX_DURATION + 30.seconds).toMillis.toJson)) - withActivation( - wsk.activation, - run, - initialWait = 1.minute, - pollPeriod = 1.minute, - totalWait = TimeLimit.MAX_DURATION + 2.minutes) { activation => - withClue("Activation result not as expected:") { - activation.response.status shouldBe ActivationResponse.messageForCode(ActivationResponse.DeveloperError) - activation.response.result shouldBe Some( - JsObject("error" -> Messages.timedoutActivation(TimeLimit.MAX_DURATION, init = false).toJson)) - activation.duration.toInt should be >= TimeLimit.MAX_DURATION.toMillis.toInt - } + Some(TestUtils.getTestActionFilename(name)), + timeout = Some(TimeLimit.MAX_DURATION), + main = main) + } + + val run = wsk.action.invoke( + name, + Map("forceHang" -> true.toJson, "sleepTimeInMs" -> (TimeLimit.MAX_DURATION + 30.seconds).toMillis.toJson)) + + withActivation( + wsk.activation, + run, + initialWait = 1.minute, + pollPeriod = 1.minute, + totalWait = TimeLimit.MAX_DURATION + 2.minutes) { activation => + withClue("Activation result not as expected:") { + activation.response.status shouldBe ActivationResponse.messageForCode(ActivationResponse.DeveloperError) + activation.response.result shouldBe Some( + JsObject("error" -> Messages.timedoutActivation(TimeLimit.MAX_DURATION, init = false).toJson)) + activation.duration.toInt should be >= TimeLimit.MAX_DURATION.toMillis.toInt } - () // explicitly map to Unit - } + } + } } } diff --git a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala index 031e51a..dab21aa 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala @@ -497,7 +497,7 @@ class ShardingContainerPoolBalancerTests val stepSize = stepSizes(hash % stepSizes.size) val uuid = UUID() //initiate activation - val published = (0 until numActivations).par.map { _ => + val published = (0 until numActivations).map { _ => val aid = ActivationId.generate() val msg = ActivationMessage( TransactionId.testing, @@ -545,12 +545,12 @@ class ShardingContainerPoolBalancerTests } //complete all - val acks = ids.par.map { aid => + val acks = ids.map { aid => val invoker = balancer.activationSlots(aid).invokerName completeActivation(invoker, balancer, aid) } - Await.ready(Future.sequence(acks.toList), 10.seconds) + Await.ready(Future.sequence(acks), 10.seconds) //verify invokers go back to unused state invokers.foreach { i =>