First, let me say that even tough I program in Java for a living, I have 
only ever used Akka with Scala. I'm actually surprised by the amount of 
syntactic noise in Akka with Java variant!

I doubt the "*ask* sends messages to only one updater even though I use 
right actorRef" part:

2016-03-08 18:10:33.949  INFO 69545 --- 
[akka-system-akka.actor.default-dispatcher-4] actors.RequestRouter     : 
Using updater for *B12* - Actor[akka://akka-system/user/*B12*#-1049418234].
2016-03-08 18:10:33.949  INFO 69545 --- 
[akka-system-akka.actor.default-dispatcher-20] actors.Updater    : 
Actor[akka://akka-system/user/*B1*#857515287] : Received from 
Actor[akka://akka-system/deadLetters] new update request *'B12*'

Apparently the message that you intended to send to updater B12 ended up in 
updater's B1 mailbox! It looks like the value of updater variable changed 
between log.info and ask invocation. It looks like you where closing over a 
 mutable variable that gets modified concurrently by different threads, but 
since updater is a local variable in the closure passed to mapAsync, this 
really shouldn't been happening... Weird.

I guess that's all I can say without digging in with a debugger :) Good 
luck!
Rafał

W dniu wtorek, 8 marca 2016 18:41:27 UTC+1 użytkownik paweł kamiński 
napisał:
>
> one more thing that is not working for me right now
>
> this is a code handling incoming connection and requests
>
> receive(ReceiveBuilder
>         .match(IncomingConnection.class, connection -> {
>             log.info("Established new connection from {}.", 
> connection.remoteAddress());
>             Flow<HttpRequest, HttpResponse, NotUsed> handler = 
> handleUpdateRequest(materializer);
>             connection.handleWith(handler, materializer);
>         })
>
>
> and the handleUpdateRequest method
>
>
> private Flow<HttpRequest, HttpResponse, NotUsed> 
> handleUpdateRequest(ActorMaterializer materializer) {
>         return Flow
>                 .of(HttpRequest.class)
>                 .mapAsync(1, request -> {
>                     log.debug("Handling request. {}", request.getUri());
>
>                     String mime = request.getHeader(Accept.class)
>                             .map(HttpHeader::value)
>                             .orElse("application/json");
>
>                     Query query = uri.query();
>                     Optional<String> idOp = query.get("id");
>                     if (!idOp.isPresent()) {
>                         return createResponse(StatusCodes.BAD_REQUEST);
>                     }
>
>                     String id = idOp.get();
>
>                     ActorRef updater = getUpdater(id, materializer.system());
>
>                     log.info("Using updater for {} - {}.", id, updater);
>
>
>  return asResponseFromScalaFuture(ask(updater, bidderId, new Timeout(
> RESPONSE_WAIT_DURATION)));
>
>                 });
>     }
>
>
> public Updater() {
>     HashSet<ActorRef> refs = Sets.newHashSet();
>     receive(ReceiveBuilder
>             .match(String.class, request -> {
>                 log.info("{} : Received from {} new update request {}.", 
> self(), sender(), request);
>                context().setReceiveTimeout(Duration.apply(3, 
> TimeUnit.SECONDS));
>                 refs.add(sender());
>             })
>             .match(ReceiveTimeout.class, timeout -> {
>                 log.info("{} : Sending response.", self());
>                 refs.stream()
>                         .forEach(ref -> {
>                             log.info("{} : Sending response to {}.", self(), 
> ref);
>                             ref.tell(createFakeResponse(), self());
>                         });
>                 refs.clear();
>             })
>             .build());
> }
>
>
> everything works when there is only one connection. request is sent to 
> updater (actorRef) and response is converted from scala future back to java 
> future and httpResponse is passed back to flow.
>
> but when there is more than one connection I can see in logs that only one 
> actor (updater) is responding, while investigating I noticed that *ask* 
> sends messages to only one updater even though I use right actorRef and 
> there is unique updaters for each connection.
>
> 2016-03-08 18:10:33.949 DEBUG 69545 --- 
> [akka-system-akka.actor.default-dispatcher-9] actors.RequestRouter     : 
> Handling request. http://localhost/budgets?id=B1
> 2016-03-08 18:10:33.949  INFO 69545 --- 
> [akka-system-akka.actor.default-dispatcher-9] actors.RequestRouter     : 
> Using updater for B1 - Actor[akka://akka-system/user/B1#857515287].
> 2016-03-08 18:10:33.949 DEBUG 69545 --- 
> [akka-system-akka.actor.default-dispatcher-4] actors.RequestRouter     : 
> Handling request. http://localhost/budgets?id=B12
> 2016-03-08 18:10:33.949  INFO 69545 --- 
> [akka-system-akka.actor.default-dispatcher-14] actors.Updater    : 
> Actor[akka://akka-system/user/B1#857515287] : Received from 
> Actor[akka://akka-system/temp/$r0] new update request 'B1'
> 2016-03-08 18:10:33.949  INFO 69545 --- 
> [akka-system-akka.actor.default-dispatcher-4] actors.RequestRouter     : 
> Using updater for B12 - Actor[akka://akka-system/user/B12#-1049418234].
> 2016-03-08 18:10:33.949  INFO 69545 --- 
> [akka-system-akka.actor.default-dispatcher-20] actors.Updater    : 
> Actor[akka://akka-system/user/B1#857515287] : Received from 
> Actor[akka://akka-system/*deadLetters*] new update request 'B12'
>
> [INFO] [03/08/2016 18:34:28.709] 
> [akka-system-akka.actor.default-dispatcher-5] [akka://akka-system/user/B1] 
> Message [akka.actor.ReceiveTimeout$] from 
> Actor[akka://akka-system/deadLetters] to 
> Actor[akka://akka-system/user/B1#385306581] was not delivered. [2] dead 
> letters encountered. 
> [INFO] [03/08/2016 18:34:28.709] 
> [akka-system-akka.actor.default-dispatcher-5] [akka://akka-system/user/B1] 
> Message [akka.actor.ReceiveTimeout$] from 
> Actor[akka://akka-system/deadLetters] to 
> Actor[akka://akka-system/user/B1#385306581] was not delivered. [3] dead 
> letters encountered. 
>
> I wonder why I get message from *deadLetters *and why it is sent to* B1 
> *actor. 
> the result is that computed messages for B12 will be never sent back. also 
> requested timeouts messages are not delivered :/ I ve messed this up :)
>
> On Tuesday, 8 March 2016 14:20:16 UTC+1, paweł kamiński wrote: 
>
>> ok, I was playing around with this yest. there is no way to do that. 
>>
>> only mapAsync is a way to create a flow that pushes elements of the 
>> stream to 3rd party components such as actors. 
>> this is a bit confusing as there is a ActorPublisher that would be good 
>> fit here if only Flow had a construct like Source.actorPublisher() / Sink
>> .actorRef(). this way we could pass Props of publisher which would 
>> transform incoming messages into something else.
>>
>> On Monday, 7 March 2016 11:28:03 UTC+1, paweł kamiński wrote:
>>>
>>> yep, i'm now thinking about duplex flow so I can push incoming messages 
>>> to actorRef sink and then such actor would be a source. 
>>>
>>> I must say that constructing right flow is not an easy task if you do it 
>>> occasionally! :)
>>> maybe I ll share my findings once I get it working
>>>
>>> On Monday, 7 March 2016 10:43:26 UTC+1, Rafał Krzewski wrote:
>>>>
>>>> W dniu poniedziałek, 7 marca 2016 10:08:23 UTC+1 użytkownik paweł 
>>>> kamiński napisał:
>>>>
>>>>> thanks for response.
>>>>>
>>>>> well ask pattern is a way to go but I thought I could avoid it and use 
>>>>> only flow's connection.
>>>>>
>>>>> Akka gives you a rich choice of communication primitives and it's OK 
>>>> to mix and match them in a way that solves the problem at hand in most 
>>>> convenient way :)
>>>>  
>>>> Happy hacking!
>>>> Rafał
>>>>
>>>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to