Yeah, but I'm willing to prefer correctness over performance in this case because a month ago I was pretty sure that it was almost impossible to implement a non-blocking/asynchronous ActorPublisher. I don't remember the reasons now but there were some real issues preventing that... There are a few threads in Akka-users group discussing it...
On Monday, May 25, 2015 at 1:12:03 PM UTC+2, √ wrote: > > Blocking is never ideal :) > > A non-blocking alternative might be to have it use pipeTo and send the > actor a message and then react to that message. > > On Mon, May 25, 2015 at 1:08 PM, Jakub Liska <liska...@gmail.com > <javascript:>> wrote: > >> Good thinking :-) Blocking the "scroll" async method right away seems to >> be ideal. Thank you >> >> On Monday, May 25, 2015 at 11:55:50 AM UTC+2, √ wrote: >> >>> >>> >>> On Mon, May 25, 2015 at 11:45 AM, Jakub Liska <liska...@gmail.com> >>> wrote: >>> >>>> 1) But if you share ExecutionContext with the actor (both using >>>> dispatcher's thread) then there cannot be a concurrent execution - it's >>>> just a question of whether "Future#onComplete" callback executes before >>>> receive partial function returns >>>> >>>> >>> The dispatcher has N threads, and it can (and will) execute the actor >>> concurrently with other things, which in this case is the onComplete >>> callback. >>> >>> >>>> 2) I guess you are right, that even when using dispatcher's thread the >>>> "Future#onComplete" is probably executed after actor's 'receive' function >>>> returns, so even though it is done so by the same thread, it can happen >>>> after another Request is processed in the mean time because it was queued >>>> prior to "Future#onComplete" callback... >>>> >>>> I think that the solution is to substitute "Future#onComplete" with >>>> "Future#map" and handling only Future error in "Future#recover" ... I'll >>>> try that out... Thank you >>>> >>> >>> Substituting those won't solve the issue of closing over, and calling, >>> methods on the actor from another thread. >>> Since what you are doing is blocking anyway, why not block first, and >>> then execute the logic in the actor itself? >>> >>> >>>> >>>> >>>> On Monday, May 25, 2015 at 11:26:09 AM UTC+2, √ wrote: >>>>> >>>>> Hi Jakub, >>>>> >>>>> Starting to read your email I definitely thought there must be >>>>> something mysterious at work! >>>>> >>>>> From what I can tell, there are a couple of compounding things here: >>>>> >>>>> 1) future.onComplete will be executed on another thread than the >>>>> actor, or "concurrently with the actor", this means that you can't close >>>>> over the actor and call methods on it from another thread, see: >>>>> http://doc.akka.io/docs/akka/2.3.11/additional/faq.html >>>>> >>>>> 2) when you call `Await` on the Future, you're only going to await it >>>>> having a value, not await its callbacks to finish execute. >>>>> So: >>>>> >>>>> 1. val f = someFuture(…) >>>>> 2. f.onComplete { … } >>>>> 3. Await.result(f, …) >>>>> >>>>> When line 3 executes, onComplete could have already executed, is >>>>> (con)currently being executed or will be executed. >>>>> >>>>> Does that make sense? >>>>> >>>>> >>>>> On Mon, May 25, 2015 at 11:03 AM, Jakub Liska <liska...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> in other words : >>>>>> >>>>>> def receive: Receive = { >>>>>> case Request(demand) if totalDemand > 0 && demand > 0 && isActive => >>>>>> >>>>>> // can it happen that another Request message comes before this >>>>>> partial function returns (while this one is being processed) ? >>>>>> >>>>>> } >>>>>> >>>>>> >>>>>> I have an asynchronous ActorProvider that is scanning ElasticSearch >>>>>> index, but I'm calling "await" at the end, so it is basically blocking : >>>>>> >>>>>> private var lastScrollId: String = _ >>>>>> >>>>>> >>>>>> def receive: Receive = { >>>>>> case Request(demand) if totalDemand > 0 && demand > 0 && isActive => >>>>>> def pushRecursively(n: Long, scrollId: String): >>>>>> Future[Option[String]] = { >>>>>> require(scrollId != null && scrollId.nonEmpty, "Scroll id must be >>>>>> present!") >>>>>> scroll(scrollId) flatMap { >>>>>> case (sid, recs) if recs.isEmpty => // empty hits means end of >>>>>> scanning/scrolling >>>>>> Future.successful(Option.empty) >>>>>> case (sid, recs) => >>>>>> onNext(recs) >>>>>> if (n > 1) >>>>>> pushRecursively(n-1, sid) >>>>>> else >>>>>> Future.successful(Option(sid)) >>>>>> } >>>>>> } >>>>>> >>>>>> val f = pushRecursively(Math.min(demand, totalDemand), lastScrollId) >>>>>> f onComplete { >>>>>> case Failure(ex) => >>>>>> log.error(ex, "Unexpected ScanSource error") >>>>>> onError(ex) >>>>>> context.stop(self) >>>>>> case Success(sidOpt) => sidOpt match { >>>>>> case None => >>>>>> log.info("ScanSource just completed...") >>>>>> if (isCompleted) >>>>>> log.warning("ScanSource already completed, I cannot figure >>>>>> out why this occurs!") >>>>>> else { >>>>>> onComplete() >>>>>> context.stop(self) >>>>>> } >>>>>> case Some(sid) => >>>>>> lastScrollId = sid >>>>>> } >>>>>> } >>>>>> f.await(600.seconds) >>>>>> >>>>>> case Cancel => >>>>>> context.stop(self) >>>>>> } >>>>>> >>>>>> >>>>>> But as you can see, there is "log.warning" sayig that onComplete() >>>>>> was already called, which can happen only if ActorPublisher wasn't >>>>>> Requested sequentially. >>>>>> >>>>>> I think this implementation is correct and valid even though it is >>>>>> blocking actor's dispatcher thread. But I really cannot figure out how >>>>>> it >>>>>> can be "completed" twice... >>>>>> >>>>>> -- >>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>> >>>>>>>>>> Check the FAQ: >>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>> >>>>>>>>>> Search the archives: >>>>>> https://groups.google.com/group/akka-user >>>>>> --- >>>>>> You received this message because you are subscribed to the Google >>>>>> Groups "Akka User List" group. >>>>>> To unsubscribe from this group and stop receiving emails from it, >>>>>> send an email to akka-user+...@googlegroups.com. >>>>>> To post to this group, send email to akka...@googlegroups.com. >>>>>> Visit this group at http://groups.google.com/group/akka-user. >>>>>> For more options, visit https://groups.google.com/d/optout. >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Cheers, >>>>> √ >>>>> >>>> -- >>>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>> >>>>>>>>>> Check the FAQ: >>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>> >>>>>>>>>> Search the archives: >>>> https://groups.google.com/group/akka-user >>>> --- >>>> You received this message because you are subscribed to the Google >>>> Groups "Akka User List" group. >>>> To unsubscribe from this group and stop receiving emails from it, send >>>> an email to akka-user+...@googlegroups.com. >>>> To post to this group, send email to akka...@googlegroups.com. >>>> Visit this group at http://groups.google.com/group/akka-user. >>>> For more options, visit https://groups.google.com/d/optout. >>>> >>> >>> >>> >>> -- >>> Cheers, >>> √ >>> >> -- >> >>>>>>>>>> Read the docs: http://akka.io/docs/ >> >>>>>>>>>> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user >> --- >> You received this message because you are subscribed to the Google Groups >> "Akka User List" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to akka-user+...@googlegroups.com <javascript:>. >> To post to this group, send email to akka...@googlegroups.com >> <javascript:>. >> Visit this group at http://groups.google.com/group/akka-user. >> For more options, visit https://groups.google.com/d/optout. >> > > > > -- > Cheers, > √ > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.