Hi Richard,
The key is flatmapConat 
<http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/stages-overview.html>
The akka team coded this build-in stage so that it flattens the nested 
ByteBuffer without consuming unnecessary memory

Am Dienstag, 2. Februar 2016 17:34:12 UTC+1 schrieb Richard Grossman:
>
> Indeed it can be a problem 
> I can't see why your code make something different not an expert of the 
> akka-stream 
> why adding a flow like make something different that map in scala
>
> On Tuesday, February 2, 2016 at 2:19:27 PM UTC+2, john....@gmail.com 
> wrote:
>>
>> Hi Richard, unfortunately my scala is very bad. (Actually I  learning 
>> scala by looking at the akka source and this list)
>> But If I am right, the line
>> .map(r => (r._1.get.entity.dataBytes.runWith(Sink.head), r._2)
>> uses an implicit materializer. So your are aggregating the whole 
>> ByteBuffer in memory.
>>
>> Hacker  √ complained about this in the 4'th Mail in this thread.
>>
>> Pleas forgive if I am wrong!
>> Many Greetings John
>>
>>
>> Am Dienstag, 2. Februar 2016 10:41:07 UTC+1 schrieb Richard Grossman:
>>>
>>> Hi
>>>
>>> If it can help someone in scala you can do this like this
>>>
>>> val future : Future[NetworkResponse] =
>>>  Source.single(req)
>>>  .log("Start Http")
>>>  .map(req => (HttpRequest(HttpMethods.GET, Uri(req.url)), req))
>>>  .log("Map to httpRequest")
>>>  .map(httpReq => httpReq._1 -> (httpReq._2.id, httpReq._2.start))
>>>  .log("Map to request")
>>>  .via(connectionPool)
>>>  .log("after connection pool")
>>>  .map(r => (r._1.get.entity.dataBytes.runWith(Sink.head), r._2))
>>>  .log("after reading response")
>>>  .mapAsync(10)(r => r._1.map(bytes => NetworkResponse(r._2._1, r._2._2, 
>>> bytes.decodeString("UTF-8"))))
>>>  .log("after map async")
>>>  .runWith(Sink.head)
>>> future pipeTo sender
>>>
>>> It start the flow for each source as single 
>>> run the flow via the connectionPool
>>> Then map to response that read the data from the http connection
>>> Finally mapAsync send back a future on any structure that you want with 
>>> the data inside
>>>
>>>
>>> On Saturday, January 30, 2016 at 6:15:36 PM UTC+2, john....@gmail.com 
>>> wrote:
>>>>
>>>> I ended up creating a flow with flatMapConcat:
>>>> With this flow the bytes of the response get accumulated to a single 
>>>> ByteString  
>>>> for example:
>>>>
>>>> final Flow<Pair, ByteString, BoxedUnit> bytestringFlow = 
>>>> Flow.of(Pair.class).flatMapConcat((Pair pair) -> {
>>>>    Try<HttpResponse> responseTry = (Try<HttpResponse>) pair.first();
>>>>    Source<ByteString, Object> dataBytes = 
>>>> responseTry.get().entity().getDataBytes();
>>>>    return dataBytes;
>>>> });
>>>>
>>>>
>>>> Am Mittwoch, 27. Januar 2016 15:13:32 UTC+1 schrieb Richard Grossman:
>>>>>
>>>>> Hi
>>>>>
>>>>> I need to exactly the same I must get the response on http call into 
>>>>> my flow.
>>>>> I see that you think this tickets can solve your problem is it true ?
>>>>>
>>>>> Thanks
>>>>>
>>>>> On Tuesday, August 4, 2015 at 6:45:07 PM UTC+3, john....@gmail.com 
>>>>> wrote:
>>>>>>
>>>>>> i think I need to wait for https://github.com/akka/akka/issues/15089
>>>>>>
>>>>>> Am Donnerstag, 30. Juli 2015 08:18:18 UTC+2 schrieb 
>>>>>> john....@gmail.com:
>>>>>>>
>>>>>>> // this is part of a BidiFlow
>>>>>>>
>>>>>>> FlowShape<Tuple2<Try<HttpResponse>, RequestResult>,
>>>>>>>       Tuple2<ByteString, Object>>
>>>>>>>       bottom =
>>>>>>>       b.graph(Flow.<Tuple2<Try<HttpResponse>, Object>>empty().
>>>>>>>             mapAsync(4, pair ->
>>>>>>>                         getEntityBytes(pair._1().get(), pair._2(), 
>>>>>>> materializer)
>>>>>>>             ).map((pair) -> new Tuple2<>(pair._1(), pair._2())));
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> static Future<Tuple2<ByteString, RequestResult>>
>>>>>>>    getEntityBytes( final HttpResponse response,
>>>>>>>                    final Object requestResult,
>>>>>>>                    final ActorMaterializer materializer) {
>>>>>>>
>>>>>>>    return response.entity().getDataBytes().runFold(
>>>>>>>          new Tuple2(ByteString.empty(),requestResult),
>>>>>>>          (aggr, next) -> new Tuple2(aggr._1().concat(next),aggr._2()), 
>>>>>>> materializer);
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> What looks a little funny to me is that I need to pass a materializer 
>>>>>>> to the inner flow?
>>>>>>>
>>>>>>> I am a little unsure because the docs Modularity, Composition and 
>>>>>>> Hierarchy state:
>>>>>>> "It is rarely useful to embed a closed graph shape in a larger graph"
>>>>>>>
>>>>>>>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to