Re: [akka-user] Ask a lot of Actors inside an Iteration in an Akka-Scheduler and wait for reply of everyone results in a stop of actor-system and webserver
Hi Simon, as Johan said, you shouldn't use `get` to wait for the result of future. This just synchronously blocks the thread from doing any other useful work. Instead, you can asynchronously handle the result of the future once it is available. Because it is so common, we have a pattern for this situation: You can use `pipeTo` to send the result of a future calculation back to your own actor and then handle it once it is there. If you structure your processing code in this way, everything is asynchronous and no threads need to blocked while waiting for other work to be finished. You can read more about it here: https://doc.akka.io/docs/akka/current/java/actors.html#ask-send-and-receive-future To do that you need to structure your `onReceive` method in a way that allows handling different kinds of messages. In the code you posted you seem to ignore the message that was send to it? Another, completely different way to structure this kind of code, is not to use any actor at all but by just writing a method using the `CompletableFuture` combinators that returns a `CompletableFuture` in the end. Then the user of the method can decide when and how to deal with the result. Johannes On Friday, September 15, 2017 at 10:34:17 AM UTC+2, Simon Jacobs wrote: > > > Hey Johan, > > thanks for your reply! > > Calling .get on a CompletableFuture blocks the thread until the future is > completed, don't do that. > > That's the point. I need to call .get because otherwise the actor stops > working. I also tried to collect the stages and wait then for the Response: > > > CompletableFuture.allOf(stages.toArray(new > CompletableFuture[stages.size()])).toCompletableFuture().get() > > > But also this results only in TimeoutExceptions (no matter what timeout is > given). The actors simply do nothing if I do not call .get. And that is the > point, that I do not understand. > > Best regards > Simon > > > Am Dienstag, 12. September 2017 15:33:35 UTC+2 schrieb Akka Team: >> >> Calling .get on a CompletableFuture blocks the thread until the future is >> completed, don't do that. User.find.findPagedList also looks suspiciously >> like something blocking. >> See this section of the docs for more info: >> http://doc.akka.io/docs/akka/current/java/dispatchers.html#problem-blocking-on-default-dispatcher >> >> -- >> Johan >> Akka Team >> >> >> On Fri, Sep 8, 2017 at 10:00 AM, 'Simon Jacobs' via Akka User List < >> akka...@googlegroups.com> wrote: >> >>> Hy there, >>> >>> I have an akka scheduler to reindex some data in the search-engine and >>> to resize the images of every user. >>> >>> It will be called directly by >>> >>> system.scheduler().scheduleOnce(FiniteDuration.create(Long.valueOf(1), >>> TimeUnit.SECONDS), reindexActor, "", system.dispatcher(), null); >>> >>> The (flatted to one method for better readability) actor: >>> >>>@Override >>> public void onReceive(Object msg) throws Exception >>> { >>> long start = System.currentTimeMillis(); >>> logger.info("Start reindexing of all User"); >>> >>> if (indexing) { >>> logger.info("Already reindexing. Skip"); >>> return; >>> } >>> >>> try { >>> int page = 0; >>> PagedList users; >>> >>> do { >>> users = User.find.findPagedList(page, COUNT_OF_ROWS >>> ); >>> List active = users.getList().stream().filter(g >>> -> g.isActive()).collect(Collectors.toList()); >>> >>> List> stages = new >>> ArrayList>(COUNT_OF_ROWS); >>> for (User user : active) { >>> ActorRef userActor = system.actorOf(Props.create >>> (DependencyInjector.class, injector, UpdateRedisActor.class)); >>> userActor.tell(new UpdateRedisActor.Index(user. >>> getId()), null); >>> >>> if (user.hasProfilePicture()) { >>> /** >>> * get the image as FilePart (imitate http >>> upload) >>> */ >>> File image = new File(user.getProfilePicture >>> ().getFileName()); >>> FilePart filePart = new FilePart< >>> Object>("", user.getFirstname(), "image/jpg", image); >>> >>> String randomFileName = UUID.randomUUID(). >>> toString(); >>> >>> /** >>> * Create new actor >>> */ >>> ActorRef imgActor = system.actorOf(Props. >>> create(DependencyInjector.class, injector, ImageActor.class)); >>> >>> /** >>>
Re: [akka-user] Ask a lot of Actors inside an Iteration in an Akka-Scheduler and wait for reply of everyone results in a stop of actor-system and webserver
Hey Johan, thanks for your reply! Calling .get on a CompletableFuture blocks the thread until the future is completed, don't do that. That's the point. I need to call .get because otherwise the actor stops working. I also tried to collect the stages and wait then for the Response: CompletableFuture.allOf(stages.toArray(new CompletableFuture [stages.size()])).toCompletableFuture().get() But also this results only in TimeoutExceptions (no matter what timeout is given). The actors simply do nothing if I do not call .get. And that is the point, that I do not understand. Best regards Simon Am Dienstag, 12. September 2017 15:33:35 UTC+2 schrieb Akka Team: > > Calling .get on a CompletableFuture blocks the thread until the future is > completed, don't do that. User.find.findPagedList also looks suspiciously > like something blocking. > See this section of the docs for more info: > http://doc.akka.io/docs/akka/current/java/dispatchers.html#problem-blocking-on-default-dispatcher > > -- > Johan > Akka Team > > > On Fri, Sep 8, 2017 at 10:00 AM, 'Simon Jacobs' via Akka User List < > akka...@googlegroups.com > wrote: > >> Hy there, >> >> I have an akka scheduler to reindex some data in the search-engine and to >> resize the images of every user. >> >> It will be called directly by >> >> system.scheduler().scheduleOnce(FiniteDuration.create(Long.valueOf(1), >> TimeUnit.SECONDS), reindexActor, "", system.dispatcher(), null); >> >> The (flatted to one method for better readability) actor: >> >>@Override >> public void onReceive(Object msg) throws Exception >> { >> long start = System.currentTimeMillis(); >> logger.info("Start reindexing of all User"); >> >> if (indexing) { >> logger.info("Already reindexing. Skip"); >> return; >> } >> >> try { >> int page = 0; >> PagedList users; >> >> do { >> users = User.find.findPagedList(page, COUNT_OF_ROWS); >> List active = users.getList().stream().filter(g >> -> g.isActive()).collect(Collectors.toList()); >> >> List> stages = new >> ArrayList>(COUNT_OF_ROWS); >> for (User user : active) { >> ActorRef userActor = system.actorOf(Props.create( >> DependencyInjector.class, injector, UpdateRedisActor.class)); >> userActor.tell(new UpdateRedisActor.Index(user. >> getId()), null); >> >> if (user.hasProfilePicture()) { >> /** >> * get the image as FilePart (imitate http >> upload) >> */ >> File image = new File(user.getProfilePicture >> ().getFileName()); >> FilePart filePart = new FilePart< >> Object>("", user.getFirstname(), "image/jpg", image); >> >> String randomFileName = UUID.randomUUID(). >> toString(); >> >> /** >> * Create new actor >> */ >> ActorRef imgActor = system.actorOf(Props. >> create(DependencyInjector.class, injector, ImageActor.class)); >> >> /** >> * And ask it >> */ >> CompletionStage stage = >> FutureConverters >> .toJava(ask(imgActor, new ImageActor. >> Request(filePart, randomFileName, imageResizeOptionFactory.getValues()), >> ImageBroker.TEN_SECONDS * 20)) >> .thenApply((response) -> ((ImageActor >> .Response) response)) >> .exceptionally((e) -> { >> Logger.error("Error while >> creating picture [" + randomFileName + "]", e); >> return new ImageActor.Response( >> 500); >> }); >> >> /** >> * Collect stages >> */ >> stages.add(stage.toCompletableFuture(). >> thenApplyAsync(response -> { >> user.updateAndSetProfileImage(response. >> fileName, response.filePath); >> user.save(); >> Logger.info("User " + user.getId() + " >> saved with new image"); >> return response; >> })); >> } >> } >>
Re: [akka-user] Ask a lot of Actors inside an Iteration in an Akka-Scheduler and wait for reply of everyone results in a stop of actor-system and webserver
Calling .get on a CompletableFuture blocks the thread until the future is completed, don't do that. User.find.findPagedList also looks suspiciously like something blocking. See this section of the docs for more info: http://doc.akka.io/docs/akka/current/java/dispatchers.html#problem-blocking-on-default-dispatcher -- Johan Akka Team On Fri, Sep 8, 2017 at 10:00 AM, 'Simon Jacobs' via Akka User List < akka-user@googlegroups.com> wrote: > Hy there, > > I have an akka scheduler to reindex some data in the search-engine and to > resize the images of every user. > > It will be called directly by > > system.scheduler().scheduleOnce(FiniteDuration.create(Long.valueOf(1), > TimeUnit.SECONDS), reindexActor, "", system.dispatcher(), null); > > The (flatted to one method for better readability) actor: > >@Override > public void onReceive(Object msg) throws Exception > { > long start = System.currentTimeMillis(); > logger.info("Start reindexing of all User"); > > if (indexing) { > logger.info("Already reindexing. Skip"); > return; > } > > try { > int page = 0; > PagedList users; > > do { > users = User.find.findPagedList(page, COUNT_OF_ROWS); > List active = users.getList().stream().filter(g > -> g.isActive()).collect(Collectors.toList()); > > List> stages = new ArrayList > >(COUNT_OF_ROWS); > for (User user : active) { > ActorRef userActor = system.actorOf(Props.create( > DependencyInjector.class, injector, UpdateRedisActor.class)); > userActor.tell(new UpdateRedisActor.Index(user.ge > tId()), null); > > if (user.hasProfilePicture()) { > /** > * get the image as FilePart (imitate http > upload) > */ > File image = new File(user.getProfilePicture > ().getFileName()); > FilePart filePart = new FilePart< > Object>("", user.getFirstname(), "image/jpg", image); > > String randomFileName = UUID.randomUUID(). > toString(); > > /** > * Create new actor > */ > ActorRef imgActor = system.actorOf(Props. > create(DependencyInjector.class, injector, ImageActor.class)); > > /** > * And ask it > */ > CompletionStage stage = > FutureConverters > .toJava(ask(imgActor, new ImageActor. > Request(filePart, randomFileName, imageResizeOptionFactory.getValues()), > ImageBroker.TEN_SECONDS * 20)) > .thenApply((response) -> ((ImageActor. > Response) response)) > .exceptionally((e) -> { > Logger.error("Error while > creating picture [" + randomFileName + "]", e); > return new ImageActor.Response(500 > ); > }); > > /** > * Collect stages > */ > stages.add(stage.toCompletableFuture(). > thenApplyAsync(response -> { > user.updateAndSetProfileImage(response. > fileName, response.filePath); > user.save(); > Logger.info("User " + user.getId() + " > saved with new image"); > return response; > })); > } > } > /** > * Wait every iteration for the actors > */ > CompletableFuture.allOf(stages.toArray(new > CompletableFuture[stages.size()])).toCompletableFuture().get(); > } > while (users.hasNext()); > } > catch (Exception e) { > logger.error("Fatal error while reindexing: ", e); > } > finally { > logger.info("Finish reindexing of all user in " + (System. > currentTimeMillis() - start) + "ms"); //does not appear in logfile. > indexing = false; > } > } > > } > > My expectation was that the scheduler fire-and-forget to the redis actor > and then asks the image-actor. Collect the result from the image actor and > update the user with the new image. Of course the iteration ends before all > images were resized. > > What happened is that the scheduler fire-and-forget to the redis actor > (works!) an