Re: [akka-user] Ask a lot of Actors inside an Iteration in an Akka-Scheduler and wait for reply of everyone results in a stop of actor-system and webserver

2017-09-28 Thread johannes . rudolph
Hi Simon,

as Johan said, you shouldn't use `get` to wait for the result of future. 
This just synchronously blocks the thread from doing any other useful work. 
Instead, you can asynchronously handle the result of the future once it is 
available. Because it is so common, we have a pattern for this situation: 
You can use `pipeTo` to send the result of a future calculation back to 
your own actor and then handle it once it is there. If you structure your 
processing code in this way, everything is asynchronous and no threads need 
to blocked while waiting for other work to be finished. You can read more 
about it 
here: 
https://doc.akka.io/docs/akka/current/java/actors.html#ask-send-and-receive-future

To do that you need to structure your `onReceive` method in a way that 
allows handling different kinds of messages. In the code you posted you 
seem to ignore the message that was send to it?

Another, completely different way to structure this kind of code, is not to 
use any actor at all but by just writing a method using the 
`CompletableFuture` combinators that returns a `CompletableFuture` in the 
end. Then the user of the method can decide when and how to deal with the 
result.

Johannes

On Friday, September 15, 2017 at 10:34:17 AM UTC+2, Simon Jacobs wrote:
>
>
> Hey Johan,
>
> thanks for your reply! 
>
> Calling .get on a CompletableFuture blocks the thread until the future is 
> completed, don't do that. 
>
> That's the point. I need to call .get because otherwise the actor stops 
> working. I also tried to collect the stages and wait then for the Response:
>
>
> CompletableFuture.allOf(stages.toArray(new 
> CompletableFuture[stages.size()])).toCompletableFuture().get()
>
>
> But also this results only in TimeoutExceptions (no matter what timeout is 
> given). The actors simply do nothing if I do not call .get. And that is the 
> point, that I do not understand.
>
> Best regards 
> Simon
>
>
> Am Dienstag, 12. September 2017 15:33:35 UTC+2 schrieb Akka Team:
>>
>> Calling .get on a CompletableFuture blocks the thread until the future is 
>> completed, don't do that. User.find.findPagedList also looks suspiciously 
>> like something blocking.
>> See this section of the docs for more info: 
>> http://doc.akka.io/docs/akka/current/java/dispatchers.html#problem-blocking-on-default-dispatcher
>>
>> --
>> Johan
>> Akka Team
>>
>>
>> On Fri, Sep 8, 2017 at 10:00 AM, 'Simon Jacobs' via Akka User List <
>> akka...@googlegroups.com> wrote:
>>
>>> Hy there,
>>>
>>> I have an akka scheduler to reindex some data in the search-engine and 
>>> to resize the images of every user.
>>>
>>> It will be called directly by
>>>
>>> system.scheduler().scheduleOnce(FiniteDuration.create(Long.valueOf(1), 
>>> TimeUnit.SECONDS), reindexActor, "", system.dispatcher(), null);
>>>
>>> The (flatted to one method for better readability) actor:
>>>  
>>>@Override
>>> public void onReceive(Object msg) throws Exception
>>> {
>>> long start = System.currentTimeMillis();
>>> logger.info("Start reindexing of all User");
>>> 
>>> if (indexing) {
>>> logger.info("Already reindexing. Skip");
>>> return;
>>> }
>>> 
>>> try {
>>> int page = 0;
>>> PagedList users;
>>> 
>>> do {
>>> users = User.find.findPagedList(page, COUNT_OF_ROWS
>>> );
>>> List active = users.getList().stream().filter(g 
>>> -> g.isActive()).collect(Collectors.toList());
>>> 
>>> List> stages = new 
>>> ArrayList>(COUNT_OF_ROWS);
>>> for (User user : active) {
>>> ActorRef userActor = system.actorOf(Props.create
>>> (DependencyInjector.class, injector, UpdateRedisActor.class));
>>> userActor.tell(new UpdateRedisActor.Index(user.
>>> getId()), null);
>>> 
>>> if (user.hasProfilePicture()) {
>>> /**
>>>  * get the image as FilePart (imitate http 
>>> upload)
>>>  */
>>> File image = new File(user.getProfilePicture
>>> ().getFileName());
>>> FilePart filePart = new FilePart<
>>> Object>("", user.getFirstname(), "image/jpg", image);
>>> 
>>> String randomFileName = UUID.randomUUID().
>>> toString();
>>> 
>>> /**
>>>  * Create new actor
>>>  */
>>> ActorRef imgActor = system.actorOf(Props.
>>> create(DependencyInjector.class, injector, ImageActor.class));
>>> 
>>> /**
>>>

Re: [akka-user] Ask a lot of Actors inside an Iteration in an Akka-Scheduler and wait for reply of everyone results in a stop of actor-system and webserver

2017-09-15 Thread 'Simon Jacobs' via Akka User List

Hey Johan,

thanks for your reply! 

Calling .get on a CompletableFuture blocks the thread until the future is 
completed, don't do that. 

That's the point. I need to call .get because otherwise the actor stops 
working. I also tried to collect the stages and wait then for the Response:


CompletableFuture.allOf(stages.toArray(new CompletableFuture
[stages.size()])).toCompletableFuture().get()


But also this results only in TimeoutExceptions (no matter what timeout is 
given). The actors simply do nothing if I do not call .get. And that is the 
point, that I do not understand.

Best regards 
Simon


Am Dienstag, 12. September 2017 15:33:35 UTC+2 schrieb Akka Team:
>
> Calling .get on a CompletableFuture blocks the thread until the future is 
> completed, don't do that. User.find.findPagedList also looks suspiciously 
> like something blocking.
> See this section of the docs for more info: 
> http://doc.akka.io/docs/akka/current/java/dispatchers.html#problem-blocking-on-default-dispatcher
>
> --
> Johan
> Akka Team
>
>
> On Fri, Sep 8, 2017 at 10:00 AM, 'Simon Jacobs' via Akka User List <
> akka...@googlegroups.com > wrote:
>
>> Hy there,
>>
>> I have an akka scheduler to reindex some data in the search-engine and to 
>> resize the images of every user.
>>
>> It will be called directly by
>>
>> system.scheduler().scheduleOnce(FiniteDuration.create(Long.valueOf(1), 
>> TimeUnit.SECONDS), reindexActor, "", system.dispatcher(), null);
>>
>> The (flatted to one method for better readability) actor:
>>  
>>@Override
>> public void onReceive(Object msg) throws Exception
>> {
>> long start = System.currentTimeMillis();
>> logger.info("Start reindexing of all User");
>> 
>> if (indexing) {
>> logger.info("Already reindexing. Skip");
>> return;
>> }
>> 
>> try {
>> int page = 0;
>> PagedList users;
>> 
>> do {
>> users = User.find.findPagedList(page, COUNT_OF_ROWS);
>> List active = users.getList().stream().filter(g 
>> -> g.isActive()).collect(Collectors.toList());
>> 
>> List> stages = new 
>> ArrayList>(COUNT_OF_ROWS);
>> for (User user : active) {
>> ActorRef userActor = system.actorOf(Props.create(
>> DependencyInjector.class, injector, UpdateRedisActor.class));
>> userActor.tell(new UpdateRedisActor.Index(user.
>> getId()), null);
>> 
>> if (user.hasProfilePicture()) {
>> /**
>>  * get the image as FilePart (imitate http 
>> upload)
>>  */
>> File image = new File(user.getProfilePicture
>> ().getFileName());
>> FilePart filePart = new FilePart<
>> Object>("", user.getFirstname(), "image/jpg", image);
>> 
>> String randomFileName = UUID.randomUUID().
>> toString();
>> 
>> /**
>>  * Create new actor
>>  */
>> ActorRef imgActor = system.actorOf(Props.
>> create(DependencyInjector.class, injector, ImageActor.class));
>> 
>> /**
>>  * And ask it
>>  */
>> CompletionStage stage = 
>> FutureConverters
>> .toJava(ask(imgActor, new ImageActor.
>> Request(filePart, randomFileName, imageResizeOptionFactory.getValues()), 
>> ImageBroker.TEN_SECONDS * 20))
>> .thenApply((response) -> ((ImageActor
>> .Response) response))
>> .exceptionally((e) -> {
>> Logger.error("Error while 
>> creating picture [" + randomFileName + "]", e);
>> return new ImageActor.Response(
>> 500);
>> });
>> 
>> /**
>>  * Collect stages
>>  */
>> stages.add(stage.toCompletableFuture().
>> thenApplyAsync(response -> {
>> user.updateAndSetProfileImage(response.
>> fileName, response.filePath);
>> user.save();
>> Logger.info("User " + user.getId() + " 
>> saved with new image");
>> return response;
>> }));
>> }
>> }
>> 

Re: [akka-user] Ask a lot of Actors inside an Iteration in an Akka-Scheduler and wait for reply of everyone results in a stop of actor-system and webserver

2017-09-12 Thread Akka Team
Calling .get on a CompletableFuture blocks the thread until the future is
completed, don't do that. User.find.findPagedList also looks suspiciously
like something blocking.
See this section of the docs for more info:
http://doc.akka.io/docs/akka/current/java/dispatchers.html#problem-blocking-on-default-dispatcher

--
Johan
Akka Team


On Fri, Sep 8, 2017 at 10:00 AM, 'Simon Jacobs' via Akka User List <
akka-user@googlegroups.com> wrote:

> Hy there,
>
> I have an akka scheduler to reindex some data in the search-engine and to
> resize the images of every user.
>
> It will be called directly by
>
> system.scheduler().scheduleOnce(FiniteDuration.create(Long.valueOf(1),
> TimeUnit.SECONDS), reindexActor, "", system.dispatcher(), null);
>
> The (flatted to one method for better readability) actor:
>
>@Override
> public void onReceive(Object msg) throws Exception
> {
> long start = System.currentTimeMillis();
> logger.info("Start reindexing of all User");
>
> if (indexing) {
> logger.info("Already reindexing. Skip");
> return;
> }
>
> try {
> int page = 0;
> PagedList users;
>
> do {
> users = User.find.findPagedList(page, COUNT_OF_ROWS);
> List active = users.getList().stream().filter(g
> -> g.isActive()).collect(Collectors.toList());
>
> List> stages = new ArrayList
> >(COUNT_OF_ROWS);
> for (User user : active) {
> ActorRef userActor = system.actorOf(Props.create(
> DependencyInjector.class, injector, UpdateRedisActor.class));
> userActor.tell(new UpdateRedisActor.Index(user.ge
> tId()), null);
>
> if (user.hasProfilePicture()) {
> /**
>  * get the image as FilePart (imitate http
> upload)
>  */
> File image = new File(user.getProfilePicture
> ().getFileName());
> FilePart filePart = new FilePart<
> Object>("", user.getFirstname(), "image/jpg", image);
>
> String randomFileName = UUID.randomUUID().
> toString();
>
> /**
>  * Create new actor
>  */
> ActorRef imgActor = system.actorOf(Props.
> create(DependencyInjector.class, injector, ImageActor.class));
>
> /**
>  * And ask it
>  */
> CompletionStage stage =
> FutureConverters
> .toJava(ask(imgActor, new ImageActor.
> Request(filePart, randomFileName, imageResizeOptionFactory.getValues()),
> ImageBroker.TEN_SECONDS * 20))
> .thenApply((response) -> ((ImageActor.
> Response) response))
> .exceptionally((e) -> {
> Logger.error("Error while
> creating picture [" + randomFileName + "]", e);
> return new ImageActor.Response(500
> );
> });
>
> /**
>  * Collect stages
>  */
> stages.add(stage.toCompletableFuture().
> thenApplyAsync(response -> {
> user.updateAndSetProfileImage(response.
> fileName, response.filePath);
> user.save();
> Logger.info("User " + user.getId() + "
> saved with new image");
> return response;
> }));
> }
> }
> /**
>  * Wait every iteration for the actors
>  */
> CompletableFuture.allOf(stages.toArray(new
> CompletableFuture[stages.size()])).toCompletableFuture().get();
> }
> while (users.hasNext());
> }
> catch (Exception e) {
> logger.error("Fatal error while reindexing: ", e);
> }
> finally {
> logger.info("Finish reindexing of all user in " + (System.
> currentTimeMillis() - start) + "ms"); //does not appear in logfile.
> indexing = false;
> }
> }
>
> }
>
> My expectation was that the scheduler fire-and-forget to the redis actor
> and then asks the image-actor. Collect the result from the image actor and
> update the user with the new image. Of course the iteration ends before all
> images were resized.
>
> What happened is that the scheduler fire-and-forget to the redis actor
> (works!) an