This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c33e81  Replace parallel collections with future based concurrency in 
tests. (#4841)
2c33e81 is described below

commit 2c33e81df3e623a77afa17afdf3590f06a8d3b9d
Author: Markus Thömmes <markusthoem...@me.com>
AuthorDate: Thu Feb 27 14:02:51 2020 +0100

    Replace parallel collections with future based concurrency in tests. (#4841)
    
    Scala 2.13 puts parallel collections into a separate module that's not 
compatible with Scala 2.12. To avoid having to work around things and to keep 
cross-compilation compatibility this just exchanges the approach for 
concurrency in tests to not use parallel collections at all.
---
 .../src/test/scala/common/ConcurrencyHelpers.scala | 30 ++++++++++
 tests/src/test/scala/limits/ThrottleTests.scala    | 19 +++---
 .../openwhisk/common/ForcibleSemaphoreTests.scala  | 11 +++-
 .../openwhisk/common/NestedSemaphoreTests.scala    | 25 +++++---
 .../openwhisk/common/ResizableSemaphoreTests.scala | 17 ++++--
 .../core/controller/test/ActionsApiTests.scala     |  2 +-
 .../core/limits/MaxActionDurationTests.scala       | 70 ++++++++++------------
 .../test/ShardingContainerPoolBalancerTests.scala  |  6 +-
 8 files changed, 115 insertions(+), 65 deletions(-)

diff --git a/tests/src/test/scala/common/ConcurrencyHelpers.scala 
b/tests/src/test/scala/common/ConcurrencyHelpers.scala
new file mode 100644
index 0000000..39fb1f5
--- /dev/null
+++ b/tests/src/test/scala/common/ConcurrencyHelpers.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future}
+
+trait ConcurrencyHelpers {
+  def concurrently[T](times: Int, timeout: FiniteDuration)(op: => T)(implicit 
ec: ExecutionContext): Iterable[T] =
+    Await.result(Future.sequence((1 to times).map(_ => Future(op))), timeout)
+
+  def concurrently[B, T](over: Iterable[B], timeout: FiniteDuration)(op: B => 
T)(
+    implicit ec: ExecutionContext): Iterable[T] =
+    Await.result(Future.sequence(over.map(v => Future(op(v)))), timeout)
+}
diff --git a/tests/src/test/scala/limits/ThrottleTests.scala 
b/tests/src/test/scala/limits/ThrottleTests.scala
index ee67e3c..7b9e00f 100644
--- a/tests/src/test/scala/limits/ThrottleTests.scala
+++ b/tests/src/test/scala/limits/ThrottleTests.scala
@@ -21,9 +21,7 @@ import java.time.Instant
 
 import akka.http.scaladsl.model.StatusCodes.TooManyRequests
 
-import scala.collection.parallel.immutable.ParSeq
-import scala.concurrent.Future
-import scala.concurrent.Promise
+import scala.concurrent.{Await, Future, Promise}
 import scala.concurrent.duration._
 import org.junit.runner.RunWith
 import org.scalatest.BeforeAndAfterAll
@@ -102,10 +100,15 @@ class ThrottleTests
    *
    * @param results the sequence of results from invocations or firings
    */
-  def waitForActivations(results: ParSeq[RunResult]) = results.foreach { 
result =>
-    if (result.exitCode == SUCCESS_EXIT) {
-      withActivation(wsk.activation, result, totalWait = 5.minutes)(identity)
+  def waitForActivations(results: Seq[RunResult]) = {
+    val done = results.map { result =>
+      if (result.exitCode == SUCCESS_EXIT) {
+        Future(withActivation(wsk.activation, result, totalWait = 5.minutes)(_ 
=> ()))
+      } else {
+        Future.successful(())
+      }
     }
+    Await.result(Future.sequence(done), 5.minutes)
   }
 
   /**
@@ -201,7 +204,7 @@ class ThrottleTests
     // wait for the activations last, if these fail, the throttle should be 
settled
     // and this gives the activations time to complete and may avoid 
unnecessarily polling
     println("waiting for activations to complete")
-    waitForActivations(results.par)
+    waitForActivations(results)
   }
 
   it should "throttle multiple activations of one trigger" in 
withAssetCleaner(wskprops) { (wp, assetHelper) =>
@@ -286,7 +289,7 @@ class ThrottleTests
     // wait for the activations last, giving the activations time to complete 
and
     // may avoid unnecessarily polling; if these fail, the throttle may not be 
settled
     println("waiting for activations to complete")
-    waitForActivations(combinedResults.par)
+    waitForActivations(combinedResults)
   }
 }
 
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/common/ForcibleSemaphoreTests.scala 
b/tests/src/test/scala/org/apache/openwhisk/common/ForcibleSemaphoreTests.scala
index cb4939d..756521f 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/common/ForcibleSemaphoreTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/common/ForcibleSemaphoreTests.scala
@@ -17,12 +17,19 @@
 
 package org.apache.openwhisk.common
 
+import common.ConcurrencyHelpers
+import org.apache.openwhisk.utils.ExecutionContextFactory
 import org.junit.runner.RunWith
 import org.scalatest.{FlatSpec, Matchers}
 import org.scalatest.junit.JUnitRunner
 
+import scala.concurrent.duration.DurationInt
+
 @RunWith(classOf[JUnitRunner])
-class ForcibleSemaphoreTests extends FlatSpec with Matchers {
+class ForcibleSemaphoreTests extends FlatSpec with Matchers with 
ConcurrencyHelpers {
+  // use an infinite thread pool to allow for maximum concurrency
+  implicit val executionContext = 
ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
+
   behavior of "ForcableSemaphore"
 
   it should "not allow to acquire, force or release negative amounts of 
permits" in {
@@ -79,7 +86,7 @@ class ForcibleSemaphoreTests extends FlatSpec with Matchers {
     (0 until 100).foreach { _ =>
       val s = new ForcibleSemaphore(32)
       // try to acquire more permits than allowed in parallel
-      val acquires = (0 until 64).par.map(_ => s.tryAcquire()).seq
+      val acquires = concurrently(64, 1.minute)(s.tryAcquire())
 
       val result = Seq.fill(32)(true) ++ Seq.fill(32)(false)
       acquires should contain theSameElementsAs result
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/common/NestedSemaphoreTests.scala 
b/tests/src/test/scala/org/apache/openwhisk/common/NestedSemaphoreTests.scala
index e87c734..b421d03 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/common/NestedSemaphoreTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/common/NestedSemaphoreTests.scala
@@ -17,13 +17,21 @@
 
 package org.apache.openwhisk.common
 
+import common.ConcurrencyHelpers
+import org.apache.openwhisk.utils.ExecutionContextFactory
 import org.junit.runner.RunWith
 import org.scalatest.FlatSpec
 import org.scalatest.Matchers
 import org.scalatest.junit.JUnitRunner
 
+import scala.concurrent.duration.DurationInt
+
 @RunWith(classOf[JUnitRunner])
-class NestedSemaphoreTests extends FlatSpec with Matchers {
+class NestedSemaphoreTests extends FlatSpec with Matchers with 
ConcurrencyHelpers {
+  // use an infinite thread pool to allow for maximum concurrency
+  implicit val executionContext = 
ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
+  val acquireTimeout = 1.minute
+
   behavior of "NestedSemaphore"
 
   it should "allow acquire of concurrency permits before acquire of memory 
permits" in {
@@ -34,16 +42,17 @@ class NestedSemaphoreTests extends FlatSpec with Matchers {
     val actionConcurrency = 5
     val actionMemory = 3
     //use all concurrency on a single slot
-    (1 to 5).par.map { i =>
-      s.tryAcquireConcurrent(actionId, actionConcurrency, actionMemory) 
shouldBe true
-    }
+    concurrently(5, acquireTimeout) {
+      s.tryAcquireConcurrent(actionId, actionConcurrency, actionMemory)
+    } should contain only true
     s.availablePermits shouldBe 20 - 3 //we used a single container (memory == 
3)
     s.concurrentState(actionId).availablePermits shouldBe 0
 
     //use up all the remaining memory (17) and concurrency slots (17 / 3 * 5 = 
25)
-    (1 to 25).par.map { i =>
-      s.tryAcquireConcurrent(actionId, actionConcurrency, actionMemory) 
shouldBe true
-    }
+    concurrently(25, acquireTimeout) {
+      s.tryAcquireConcurrent(actionId, actionConcurrency, actionMemory)
+    } should contain only true
+
     s.availablePermits shouldBe 2 //we used 18 (20/3 = 6, 6*3=18)
     s.concurrentState(actionId).availablePermits shouldBe 0
     s.tryAcquireConcurrent("action1", actionConcurrency, actionMemory) 
shouldBe false
@@ -55,7 +64,7 @@ class NestedSemaphoreTests extends FlatSpec with Matchers {
     (0 until 100).foreach { _ =>
       val s = new NestedSemaphore(32)
       // try to acquire more permits than allowed in parallel
-      val acquires = (0 until 64).par.map(_ => s.tryAcquire()).seq
+      val acquires = concurrently(64, acquireTimeout)(s.tryAcquire())
 
       val result = Seq.fill(32)(true) ++ Seq.fill(32)(false)
       acquires should contain theSameElementsAs result
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/common/ResizableSemaphoreTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/common/ResizableSemaphoreTests.scala
index 19048c4..4115c0e 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/common/ResizableSemaphoreTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/common/ResizableSemaphoreTests.scala
@@ -17,13 +17,21 @@
 
 package org.apache.openwhisk.common
 
+import common.ConcurrencyHelpers
+import org.apache.openwhisk.utils.ExecutionContextFactory
 import org.junit.runner.RunWith
 import org.scalatest.FlatSpec
 import org.scalatest.Matchers
 import org.scalatest.junit.JUnitRunner
 
+import scala.concurrent.duration.DurationInt
+
 @RunWith(classOf[JUnitRunner])
-class ResizableSemaphoreTests extends FlatSpec with Matchers {
+class ResizableSemaphoreTests extends FlatSpec with Matchers with 
ConcurrencyHelpers {
+  // use an infinite thread pool to allow for maximum concurrency
+  implicit val executionContext = 
ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
+  val acquireTimeout = 1.minute
+
   behavior of "ResizableSemaphore"
 
   it should "not allow to acquire, force or release negative amounts of 
permits" in {
@@ -163,7 +171,7 @@ class ResizableSemaphoreTests extends FlatSpec with 
Matchers {
     (0 until 100).foreach { _ =>
       val s = new ResizableSemaphore(32, 35)
       // try to acquire more permits than allowed in parallel
-      val acquires = (0 until 64).par.map(_ => s.tryAcquire()).seq
+      val acquires = concurrently(64, acquireTimeout)(s.tryAcquire())
 
       val result = Seq.fill(32)(true) ++ Seq.fill(32)(false)
       acquires should contain theSameElementsAs result
@@ -173,11 +181,10 @@ class ResizableSemaphoreTests extends FlatSpec with 
Matchers {
   it should "release permits even under concurrent load" in {
     val s = new ResizableSemaphore(32, 35)
     // try to acquire more permits than allowed in parallel
-    val acquires = (0 until 64).par.map(_ => s.tryAcquire()).seq
+    concurrently(64, acquireTimeout)(s.tryAcquire())
+    concurrently(32, acquireTimeout)(s.release(1, true))
 
-    (0 until 32).par.map(_ => s.release(1, true))
     s.counter shouldBe 0
-
   }
 
 }
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
index 9a6075e..9a1a96e 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
@@ -1161,7 +1161,7 @@ class ActionsApiTests extends ControllerTestCommon with 
WhiskActionsApi {
       action.publish,
       action.annotations ++ systemAnnotations(kind))
 
-    (0 until 5).par.map { i =>
+    (0 until 5).map { i =>
       Get(s"$collectionPath/$name") ~> Route.seal(routes(creds)(transid())) ~> 
check {
         status should be(OK)
         val response = responseAs[WhiskAction]
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/limits/MaxActionDurationTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/limits/MaxActionDurationTests.scala
index 4995b0c..167bcc3 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/limits/MaxActionDurationTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/limits/MaxActionDurationTests.scala
@@ -18,18 +18,12 @@
 package org.apache.openwhisk.core.limits
 
 import java.io.File
-import scala.concurrent.duration.DurationInt
 
+import scala.concurrent.duration.DurationInt
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
-
-import common.TestHelpers
-import common.TestUtils
+import common.{ConcurrencyHelpers, TestHelpers, TestUtils, WskActorSystem, 
WskProps, WskTestHelpers}
 import common.rest.WskRestOperations
-import common.WskProps
-import common.WskTestHelpers
-import common.WskActorSystem
-
 import org.apache.openwhisk.core.entity._
 import spray.json.DefaultJsonProtocol._
 import spray.json._
@@ -41,7 +35,7 @@ import org.scalatest.tagobjects.Slow
  * Tests for action duration limits. These tests require a deployed backend.
  */
 @RunWith(classOf[JUnitRunner])
-class MaxActionDurationTests extends TestHelpers with WskTestHelpers with 
WskActorSystem {
+class MaxActionDurationTests extends TestHelpers with WskTestHelpers with 
WskActorSystem with ConcurrencyHelpers {
 
   implicit val wskprops = WskProps()
   val wsk = new WskRestOperations
@@ -65,41 +59,41 @@ class MaxActionDurationTests extends TestHelpers with 
WskTestHelpers with WskAct
   "node-, python, and java-action" should s"run up to the max allowed duration 
(${TimeLimit.MAX_DURATION})" taggedAs (Slow) in withAssetCleaner(
     wskprops) { (wp, assetHelper) =>
     // When you add more runtimes, keep in mind, how many actions can be 
processed in parallel by the Invokers!
-    Map("node" -> "helloDeadline.js", "python" -> "sleep.py", "java" -> 
"sleep.jar")
+    val runtimes = Map("node" -> "helloDeadline.js", "python" -> "sleep.py", 
"java" -> "sleep.jar")
       .filter {
         case (_, name) =>
           new File(TestUtils.getTestActionFilename(name)).exists()
       }
-      .par
-      .map {
-        case (k, name) =>
-          println(s"Testing action kind '${k}' with action '${name}'")
-          assetHelper.withCleaner(wsk.action, name) { (action, _) =>
-            val main = if (k == "java") Some("Sleep") else None
-            action.create(
-              name,
-              Some(TestUtils.getTestActionFilename(name)),
-              timeout = Some(TimeLimit.MAX_DURATION),
-              main = main)
-          }
 
-          val run = wsk.action.invoke(
+    concurrently(runtimes.toSeq, TimeLimit.MAX_DURATION + 2.minutes) {
+      case (k, name) =>
+        println(s"Testing action kind '${k}' with action '${name}'")
+        assetHelper.withCleaner(wsk.action, name) { (action, _) =>
+          val main = if (k == "java") Some("Sleep") else None
+          action.create(
             name,
-            Map("forceHang" -> true.toJson, "sleepTimeInMs" -> 
(TimeLimit.MAX_DURATION + 30.seconds).toMillis.toJson))
-          withActivation(
-            wsk.activation,
-            run,
-            initialWait = 1.minute,
-            pollPeriod = 1.minute,
-            totalWait = TimeLimit.MAX_DURATION + 2.minutes) { activation =>
-            withClue("Activation result not as expected:") {
-              activation.response.status shouldBe 
ActivationResponse.messageForCode(ActivationResponse.DeveloperError)
-              activation.response.result shouldBe Some(
-                JsObject("error" -> 
Messages.timedoutActivation(TimeLimit.MAX_DURATION, init = false).toJson))
-              activation.duration.toInt should be >= 
TimeLimit.MAX_DURATION.toMillis.toInt
-            }
+            Some(TestUtils.getTestActionFilename(name)),
+            timeout = Some(TimeLimit.MAX_DURATION),
+            main = main)
+        }
+
+        val run = wsk.action.invoke(
+          name,
+          Map("forceHang" -> true.toJson, "sleepTimeInMs" -> 
(TimeLimit.MAX_DURATION + 30.seconds).toMillis.toJson))
+
+        withActivation(
+          wsk.activation,
+          run,
+          initialWait = 1.minute,
+          pollPeriod = 1.minute,
+          totalWait = TimeLimit.MAX_DURATION + 2.minutes) { activation =>
+          withClue("Activation result not as expected:") {
+            activation.response.status shouldBe 
ActivationResponse.messageForCode(ActivationResponse.DeveloperError)
+            activation.response.result shouldBe Some(
+              JsObject("error" -> 
Messages.timedoutActivation(TimeLimit.MAX_DURATION, init = false).toJson))
+            activation.duration.toInt should be >= 
TimeLimit.MAX_DURATION.toMillis.toInt
           }
-          () // explicitly map to Unit
-      }
+        }
+    }
   }
 }
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index 031e51a..dab21aa 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -497,7 +497,7 @@ class ShardingContainerPoolBalancerTests
     val stepSize = stepSizes(hash % stepSizes.size)
     val uuid = UUID()
     //initiate activation
-    val published = (0 until numActivations).par.map { _ =>
+    val published = (0 until numActivations).map { _ =>
       val aid = ActivationId.generate()
       val msg = ActivationMessage(
         TransactionId.testing,
@@ -545,12 +545,12 @@ class ShardingContainerPoolBalancerTests
     }
 
     //complete all
-    val acks = ids.par.map { aid =>
+    val acks = ids.map { aid =>
       val invoker = balancer.activationSlots(aid).invokerName
       completeActivation(invoker, balancer, aid)
     }
 
-    Await.ready(Future.sequence(acks.toList), 10.seconds)
+    Await.ready(Future.sequence(acks), 10.seconds)
 
     //verify invokers go back to unused state
     invokers.foreach { i =>

Reply via email to