This is an automated email from the ASF dual-hosted git repository. rabbah pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 1293647 fix memory drag in PrimitiveActions, and clean up withAlternateAfterTimeout (#3129) 1293647 is described below commit 1293647f96829df4c0ea90e815dc2b6fc664de11 Author: Nick Mitchell <star...@users.noreply.github.com> AuthorDate: Tue Jan 2 17:40:27 2018 -0500 fix memory drag in PrimitiveActions, and clean up withAlternateAfterTimeout (#3129) --- .../whisk/utils/ExecutionContextFactory.scala | 51 ++++++++++++++++------ 1 file changed, 38 insertions(+), 13 deletions(-) diff --git a/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala b/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala index 88764d3..ea025cb 100644 --- a/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala +++ b/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala @@ -23,38 +23,63 @@ import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration.FiniteDuration -import scala.util.Try +import scala.util.control.NonFatal import akka.actor.ActorSystem -import akka.pattern.{after => expire} +import akka.actor.Cancellable +import akka.actor.Scheduler object ExecutionContextFactory { - // Future.firstCompletedOf has a memory drag bug - // https://stackoverflow.com/questions/36420697/about-future-firstcompletedof-and-garbage-collect-mechanism - def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = { + private type CancellableFuture[T] = (Cancellable, Future[T]) + + /** + * akka.pattern.after has a memory drag issue: it opaquely + * schedules an actor which consequently results in drag for the + * timeout duration + * + */ + def expire[T](duration: FiniteDuration, using: Scheduler)(value: ⇒ Future[T])( + implicit ec: ExecutionContext): CancellableFuture[T] = { val p = Promise[T]() - val pref = new java.util.concurrent.atomic.AtomicReference(p) - val completeFirst: Try[T] => Unit = { result: Try[T] => - val promise = pref.getAndSet(null) - if (promise != null) { - promise.tryComplete(result) + val cancellable = using.scheduleOnce(duration) { + p completeWith { + try value + catch { case NonFatal(t) ⇒ Future.failed(t) } } } - futures foreach { _ onComplete completeFirst } + (cancellable, p.future) + } + + /** + * Return the first of the two given futures to complete; if f1 + * finishes first, we will cancel f2 + * + */ + def firstCompletedOf2[T](f1: Future[T], f2Cancellable: CancellableFuture[T])( + implicit executor: ExecutionContext): Future[T] = { + val p = Promise[T]() + val (f2Killswitch, f2) = f2Cancellable + + f1.onComplete { result => + p.tryComplete(result) + f2Killswitch.cancel() + } + f2.onComplete(p.tryComplete) + p.future } implicit class FutureExtensions[T](f: Future[T]) { def withTimeout(timeout: FiniteDuration, msg: => Throwable)(implicit system: ActorSystem): Future[T] = { implicit val ec = system.dispatcher - firstCompletedOf(Seq(f, expire(timeout, system.scheduler)(Future.failed(msg)))) + firstCompletedOf2(f, expire(timeout, system.scheduler)(Future.failed(msg))) } def withAlternativeAfterTimeout(timeout: FiniteDuration, alt: => Future[T])( implicit system: ActorSystem): Future[T] = { implicit val ec = system.dispatcher - firstCompletedOf(Seq(f, expire(timeout, system.scheduler)(alt))) + firstCompletedOf2(f, expire(timeout, system.scheduler)(alt)) } } -- To stop receiving notification emails like this one, please contact ['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].