[akka-user] Catch dropped elements in a stream

2016-08-31 Thread Victor
Hi,

I've designed an AMQP graph where I need to ack or nack a message depending 
on the processing result of the message:

*AmqpSource* (consume) -> *Process* -> *Recover* -> *AmqpSink* (publish)

All stages are under my control except the *Process* stage which is a *Flow* 
and is implemented by a different user and is a black box from my point of 
view.

The *Recover* stage will emit an *ErrorMessage* or a *Message* depending on 
the *Process* stage result, because I have to deal with *Process* stage 
failures as I don't know what happen in this stage.

The *AmqpSink* will publish the received message to an exchange and then 
ack or nack the message depending if it's an *ErrorMessage* or a *Message*.

The problem I have is that in the *Process* stage the user may filter 
incoming elements, so these elements will never arrive in the *AmqpSink* 
and will never be acked.

So to make it simple, if I have N elements out from the source, I MUST have 
N elements in in the sink, whereas it's an *ErrorMessage* or a *Message*, 
and all of that whatever happens in the *Process* stage.
Is there a way to catch dropped elements in a stream? How could I resolve 
my problem?

Thanks in advance,
Victor

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Embed a Flow in a GraphStage?

2016-09-01 Thread Victor
Hi,

Is it possible to embed a Flow in a GraphStage?

I want to design a GraphStage which monitor a flow given by a user (pseudo 
code):

class MyFlow(embededFlow: Flow[MyMessage, MyMessage, NotUsed]) extends 
GraphStage[FlowShape[MyContent, MyContent]] {

  var id

  onPush = {
val msg = grab(in)
id = msg.id
push(embededFlow.in, msg.content)
  }

  onPull = {
val content = grab(embededFlow.out)
push(out, MyMessage(id, content))
  }

  embededFlow.onFailure = {
push(out, ...)
  }

}


The embeded flow is a black box and I want to monitor it, react to it's 
failure and manage a state because I don't want to expose the whole message 
to the embeded flow, I just want it to process a part of the message.

Is it possible?

Thanks in advance,
Victor

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Embed a Flow in a GraphStage?

2016-09-01 Thread Victor
Ah I see :) Something like this:

... -> BidiFlow.port1 -> EmbededFlow -> BidiFlow.port2 -> ...

Thanks,
Victor

Le jeudi 1 septembre 2016 10:14:04 UTC+2, drewhk a écrit :
>
> Hi Victor,
>
> Embedding is not the way to think about this. What you really want is a 
> BidiStage which then can be composed by any Flow to get a new Flow.
>
> -Endre
>
> On Thu, Sep 1, 2016 at 10:12 AM, Victor  > wrote:
>
>> Hi,
>>
>> Is it possible to embed a Flow in a GraphStage?
>>
>> I want to design a GraphStage which monitor a flow given by a user 
>> (pseudo code):
>>
>> class MyFlow(embededFlow: Flow[MyMessage, MyMessage, NotUsed]) extends 
>> GraphStage[FlowShape[MyContent, MyContent]] {
>>
>>   var id
>>
>>   onPush = {
>> val msg = grab(in)
>> id = msg.id
>> push(embededFlow.in, msg.content)
>>   }
>>
>>   onPull = {
>> val content = grab(embededFlow.out)
>> push(out, MyMessage(id, content))
>>   }
>>
>>   embededFlow.onFailure = {
>> push(out, ...)
>>   }
>>
>> }
>>
>>
>> The embeded flow is a black box and I want to monitor it, react to it's 
>> failure and manage a state because I don't want to expose the whole message 
>> to the embeded flow, I just want it to process a part of the message.
>>
>> Is it possible?
>>
>> Thanks in advance,
>> Victor
>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Embed a Flow in a GraphStage?

2016-09-01 Thread Victor
Ok, thanks :)

Le jeudi 1 septembre 2016 10:32:27 UTC+2, drewhk a écrit :
>
> Even simpler newFlow = flow join bidiFlow or newFlow = bidiFlow join flow
>
> On Thu, Sep 1, 2016 at 10:23 AM, Victor  > wrote:
>
>> Ah I see :) Something like this:
>>
>> ... -> BidiFlow.port1 -> EmbededFlow -> BidiFlow.port2 -> ...
>>
>> Thanks,
>> Victor
>>
>> Le jeudi 1 septembre 2016 10:14:04 UTC+2, drewhk a écrit :
>>>
>>> Hi Victor,
>>>
>>> Embedding is not the way to think about this. What you really want is a 
>>> BidiStage which then can be composed by any Flow to get a new Flow.
>>>
>>> -Endre
>>>
>>> On Thu, Sep 1, 2016 at 10:12 AM, Victor  wrote:
>>>
>>>> Hi,
>>>>
>>>> Is it possible to embed a Flow in a GraphStage?
>>>>
>>>> I want to design a GraphStage which monitor a flow given by a user 
>>>> (pseudo code):
>>>>
>>>> class MyFlow(embededFlow: Flow[MyMessage, MyMessage, NotUsed]) extends 
>>>> GraphStage[FlowShape[MyContent, MyContent]] {
>>>>
>>>>   var id
>>>>
>>>>   onPush = {
>>>> val msg = grab(in)
>>>> id = msg.id
>>>> push(embededFlow.in, msg.content)
>>>>   }
>>>>
>>>>   onPull = {
>>>> val content = grab(embededFlow.out)
>>>> push(out, MyMessage(id, content))
>>>>   }
>>>>
>>>>   embededFlow.onFailure = {
>>>> push(out, ...)
>>>>   }
>>>>
>>>> }
>>>>
>>>>
>>>> The embeded flow is a black box and I want to monitor it, react to it's 
>>>> failure and manage a state because I don't want to expose the whole 
>>>> message 
>>>> to the embeded flow, I just want it to process a part of the message.
>>>>
>>>> Is it possible?
>>>>
>>>> Thanks in advance,
>>>> Victor
>>>>
>>>> -- 
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ: 
>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>> >>>>>>>>>> Search the archives: 
>>>> https://groups.google.com/group/akka-user
>>>> --- 
>>>> You received this message because you are subscribed to the Google 
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send 
>>>> an email to akka-user+...@googlegroups.com.
>>>> To post to this group, send email to akka...@googlegroups.com.
>>>> Visit this group at https://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Terminate ActorSystem on stream failure

2016-09-09 Thread Victor
Hi,

How can I terminate the ActorSystem running my stream when the stream fails?

If I have the following stream:

A -> B -> C

and B fails with a stoppingStrategy, what happens exactly? A and C actors 
are still running? How can I catch such stopping to then terminate the 
ActorSystem?

I think I have to use materialized value but it's not clear because if it's 
the solution I would have to return a Future as a materialized value from 
each of my stages to then listen on failures. It seems heavy :)

Thank in advance,
Victor

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Terminate ActorSystem on stream failure

2016-09-12 Thread Victor
Ok, I see on the Akka Gitter (https://gitter.im/akka/akka) a message by 
@drewhk which seems to confirm that we have to use materialized value to be 
notified of a stream completion, but it's to us to return something as a 
materialized value which can notify us.

On September 12, 2016 3:44 PM, @drewhk wrote:
"to be notified once everything is "done" [...] streams itself cannot 
solve. You need support from the Sinks themselves to give you a signal 
(usually in the form of a materialized value) that they are done"

My stream is an AMQP stream which never complete (it enriches messages), 
but it can fails so ATM I return a Future as a materialized value from my 
sink and on future failure, I terminate the actor system (because I want my 
service to fails to let other supervisor services handle this failure).

He also wrote that "this is the topic of one of the upcoming blog posts", 
so I will see what is considered a good solution :)

Le vendredi 9 septembre 2016 17:50:24 UTC+2, Victor a écrit :
>
> Hi,
>
> How can I terminate the ActorSystem running my stream when the stream 
> fails?
>
> If I have the following stream:
>
> A -> B -> C
>
> and B fails with a stoppingStrategy, what happens exactly? A and C actors 
> are still running? How can I catch such stopping to then terminate the 
> ActorSystem?
>
> I think I have to use materialized value but it's not clear because if 
> it's the solution I would have to return a Future as a materialized value 
> from each of my stages to then listen on failures. It seems heavy :)
>
> Thank in advance,
> Victor
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Source from Sink

2016-09-20 Thread Victor
Hi,

Is it possible to create a Source from a Sink ?

I want every element received in the Sink to be emitted from the Source.

I have to do that because I use a MergeHub and a feedback loop.

Thank you,
Victor

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Source from Sink

2016-09-20 Thread Victor
Akka stream is demand driven but is there a way to create a custom 
FlowShape stage which will redirect sink to source?

Le mardi 20 septembre 2016 14:30:46 UTC+2, Victor a écrit :
>
> Hi,
>
> Is it possible to create a Source from a Sink ?
>
> I want every element received in the Sink to be emitted from the Source.
>
> I have to do that because I use a MergeHub and a feedback loop.
>
> Thank you,
> Victor
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Source from Sink

2016-09-20 Thread Victor
This is what I want to achieve:

<https://lh3.googleusercontent.com/-XSh_7PyXMdo/V-E3gQROnWI/ABw/m0zxtF82494d2ZnYj-Qio3TuXzHUHl2fwCLcB/s1600/example.png>


I receive an HttpResponse coming from one single TCP connection always 
open, I want to emit ByteString which then are precessed by a JsonFraming 
stage. The problem is that the HTTP entity is a sub-source. I know I could 
simply use runFold on the dataBytes but then I have to deal with a future 
and mapAsync and I don't want to, I want to try another way :)


The broadcast is here to slow down the HttpResponse in-flow.


I hope this is clear, if not don't hesitate to ask questions.

I you want I've created a little library for draw.io with Source, Sink, 
Flow, etc. elements to quickly draw flow charts.


Thank you,

Victor


Le mardi 20 septembre 2016 14:30:46 UTC+2, Victor a écrit :
>
> Hi,
>
> Is it possible to create a Source from a Sink ?
>
> I want every element received in the Sink to be emitted from the Source.
>
> I have to do that because I use a MergeHub and a feedback loop.
>
> Thank you,
> Victor
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Source from Sink

2016-09-20 Thread Victor
Thanks for your response. I don't see where alsoTo can help me here? I 
think what I want to achieve here is to join a graph with the graph 
obtained by materializing the MergeHub.

Le mardi 20 septembre 2016 15:02:39 UTC+2, √ a écrit :
>
> Hi,
>
> Flow[T].alsoTo(sink)?
>
> -- 
> Cheers,
> √
>
> On Sep 20, 2016 2:30 PM, "Victor" > 
> wrote:
>
>> Hi,
>>
>> Is it possible to create a Source from a Sink ?
>>
>> I want every element received in the Sink to be emitted from the Source.
>>
>> I have to do that because I use a MergeHub and a feedback loop.
>>
>> Thank you,
>> Victor
>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Source from Sink

2016-09-20 Thread Victor
Sorry for the spam. To make it simple, what I really want to achieve is to 
have a continuous flow of ByteString from an unique HTTP connection. 
HttpRequest in, ByteString out, not multiple HttpResponse with each their 
sub-source.

Le mardi 20 septembre 2016 16:30:24 UTC+2, Victor a écrit :
>
> Thanks for your response. I don't see where alsoTo can help me here? I 
> think what I want to achieve here is to join a graph with the graph 
> obtained by materializing the MergeHub.
>
> Le mardi 20 septembre 2016 15:02:39 UTC+2, √ a écrit :
>>
>> Hi,
>>
>> Flow[T].alsoTo(sink)?
>>
>> -- 
>> Cheers,
>> √
>>
>> On Sep 20, 2016 2:30 PM, "Victor"  wrote:
>>
>>> Hi,
>>>
>>> Is it possible to create a Source from a Sink ?
>>>
>>> I want every element received in the Sink to be emitted from the Source.
>>>
>>> I have to do that because I use a MergeHub and a feedback loop.
>>>
>>> Thank you,
>>> Victor
>>>
>>> -- 
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ: 
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives: 
>>> https://groups.google.com/group/akka-user
>>> --- 
>>> You received this message because you are subscribed to the Google 
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send 
>>> an email to akka-user+...@googlegroups.com.
>>> To post to this group, send email to akka...@googlegroups.com.
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Source from Sink

2016-09-20 Thread Victor
Here we go!

Just click on the pencil icon in the scratchpad panel and then click on 
Import.

Le mardi 20 septembre 2016 17:32:47 UTC+2, Justin du coeur a écrit :
>
> On Tue, Sep 20, 2016 at 9:27 AM, Victor  > wrote:
>
>> I you want I've created a little library for draw.io with Source, Sink, 
>> Flow, etc. elements to quickly draw flow charts.
>>
> Neat!  I suspect that, if you can make that available, lots of people 
> would find it useful.  (Frankly, it would make many conversations here 
> clearer.) 
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


stream.xml
Description: XML document


Re: [akka-user] Source from Sink

2016-09-20 Thread Victor
Thank you very much! I discover new things every day.

Le mardi 20 septembre 2016 17:56:24 UTC+2, drewhk a écrit :
>
> Isn't that just simply:
>
> requests.via(outgoingHttpConnection).flatMapConcat { resp =>
>resp.dataBytes
> }
>
> ?
>
> On Tue, Sep 20, 2016 at 4:42 PM, Victor  > wrote:
>
>> Sorry for the spam. To make it simple, what I really want to achieve is 
>> to have a continuous flow of ByteString from an unique HTTP connection. 
>> HttpRequest in, ByteString out, not multiple HttpResponse with each their 
>> sub-source.
>>
>>
>> Le mardi 20 septembre 2016 16:30:24 UTC+2, Victor a écrit :
>>>
>>> Thanks for your response. I don't see where alsoTo can help me here? I 
>>> think what I want to achieve here is to join a graph with the graph 
>>> obtained by materializing the MergeHub.
>>>
>>> Le mardi 20 septembre 2016 15:02:39 UTC+2, √ a écrit :
>>>>
>>>> Hi,
>>>>
>>>> Flow[T].alsoTo(sink)?
>>>>
>>>> -- 
>>>> Cheers,
>>>> √
>>>>
>>>> On Sep 20, 2016 2:30 PM, "Victor"  wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Is it possible to create a Source from a Sink ?
>>>>>
>>>>> I want every element received in the Sink to be emitted from the 
>>>>> Source.
>>>>>
>>>>> I have to do that because I use a MergeHub and a feedback loop.
>>>>>
>>>>> Thank you,
>>>>> Victor
>>>>>
>>>>> -- 
>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>> >>>>>>>>>> Check the FAQ: 
>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>> >>>>>>>>>> Search the archives: 
>>>>> https://groups.google.com/group/akka-user
>>>>> --- 
>>>>> You received this message because you are subscribed to the Google 
>>>>> Groups "Akka User List" group.
>>>>> To unsubscribe from this group and stop receiving emails from it, send 
>>>>> an email to akka-user+...@googlegroups.com.
>>>>> To post to this group, send email to akka...@googlegroups.com.
>>>>> Visit this group at https://groups.google.com/group/akka-user.
>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>
>>>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-stream] Detect failure in MergeHub

2016-10-18 Thread Victor
Hi,

It's written in the ScalaDoc of the *MergeHub.source* method that:

If one of the inputs fails the Sink, the Source is failed in turn


But, in the MergeHub source code, the *onUpstreamFailure* method only throw 
an exception:

override def onUpstreamFailure(ex: Throwable): Unit = {
  throw new MergeHub.ProducerFailed("Upstream producer failed with 
exception, " +
"removing from MergeHub now", ex)
}

So maybe I'm missing something, but why failing an input will fails the 
Source?
I need the MergeHub to fails when an input fails, but it doesn't seems to 
work.

Thanks,
Victor

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: [akka-stream] Detect failure in MergeHub

2016-10-18 Thread Victor
Sure!

https://github.com/akka/akka/issues/21693

Le mardi 18 octobre 2016 14:36:35 UTC+2, johannes...@lightbend.com a écrit :
>
> Hi Victor,
>
> good point. I think the Scaladoc is wrong there. Could you raise an issue 
> at akka/akka?
>
> Johannes
>
> On Tuesday, October 18, 2016 at 2:28:14 PM UTC+2, Victor wrote:
>>
>> Hi,
>>
>> It's written in the ScalaDoc of the *MergeHub.source* method that:
>>
>> If one of the inputs fails the Sink, the Source is failed in turn
>>
>>
>> But, in the MergeHub source code, the *onUpstreamFailure* method only 
>> throw an exception:
>>
>> override def onUpstreamFailure(ex: Throwable): Unit = {
>>   throw new MergeHub.ProducerFailed("Upstream producer failed with 
>> exception, " +
>> "removing from MergeHub now", ex)
>> }
>>
>> So maybe I'm missing something, but why failing an input will fails the 
>> Source?
>> I need the MergeHub to fails when an input fails, but it doesn't seems to 
>> work.
>>
>> Thanks,
>> Victor
>>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka stream HTTP client performance test

2017-01-26 Thread Victor
Hi,

How could I test the performance of the Akka Stream HTTP client ?

I have a stream which make calls to an HTTP service, there may be many 
elements traversing the flow.
I just want to have some figure to calibrate my system, or some good 
practice when dealing with the Akka Stream HTTP client.

Here is the very naive test I've made (it's an approximation but it allows 
me to get some figures to compare my implementations):

object Test extends App {

  import stream.http.syntax._

  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()

  var t = -1L

  Source
.repeat((HttpRequest(uri = Uri("http://localhost:8080";)), null))
.take(1)
.http()
.to(Sink.onComplete { _ =>
  println(System.currentTimeMillis() - t)
  system.terminate()
})
.run()

  Http().bindAndHandle(Flow[HttpRequest].map { _ =>
if (t == -1) t = System.currentTimeMillis()
HttpResponse()
  }, "127.0.0.1", 8080)

}

The http operator is a custom one, here it is:

package object syntax {

  implicit class HttpOperators[O, M, S[+SO, +SM] <: FlowOps[SO, SM]](stage: 
S[(HttpRequest, O), M]) {

def http()(implicit system: ActorSystem, materializer: 
ActorMaterializer): S[(HttpRequest, O), M]#Repr[Either[(Throwable, O), 
((ByteString, StatusCode), O)]] = {
  stage
.via(Http().superPool())
.via(Flow.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val partition = builder.add(Partition[(Try[HttpResponse], O)](2, {
case (Failure(_), _) => 0
case (Success(_), _) => 1
  }))
  val unzip = builder.add(Unzip[Source[ByteString, Any], 
(StatusCode, O)])
  val zip = builder.add(ZipWith[ByteString, (StatusCode, O), 
Either[(Throwable, O), ((ByteString, StatusCode), O)]] { case (data, 
(status, element)) => Right(((data, status), element)) })
  val merge = builder.add(Merge[Either[(Throwable, O), 
((ByteString, StatusCode), O)]](2))

  partition.out(0).map { case (response, element) => 
Left((response.failed.get, element)) } ~> merge.in(0)
  partition.out(1).map { case (response, element) => 
(response.get.entity.dataBytes, (response.get.status, element)) } ~> 
unzip.in
  unzip.out0.flatMapConcat(identity) ~> zip.in0
  unzip.out1 ~> zip.in1
  zip.out ~> merge.in(1)

  FlowShape(partition.in, merge.out)
}))
}

  }

}

I've tried to implement my http operator with mapAsyncUnordered but it was 
slower in my test (about 1s slower for ~1r).

Thank you in advance for your feedbacks.
Victor

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Parallel file download

2017-02-13 Thread Victor
Hi,

I've created a route in my application to download a large file. But I 
can't figure out how to make this route handles calls in parallel.

I've tried to read a different file on each request, tried with *mapAsync*, 
with *mapAsyncUnordered* and with a custom dispatcher, but none of the 
enumerated solutions works.

Is there something I've missed?

Here is the first version I've made:

Http().bindAndHandle(Flow[HttpRequest].map { _ => HttpResponse(entity = 
HttpEntity(ContentTypes.`application/octet-stream`, FileIO.fromPath(Paths.
get("file" }, "localhost", 8080)

Thanks for your help,
Victor

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Parallel file download

2017-02-14 Thread Victor
Ok it seems it's my browser (Chrome) which can't handle parallel download. 
With *wget* it works.

Le lundi 13 février 2017 14:54:14 UTC+1, Victor a écrit :
>
> Hi,
>
> I've created a route in my application to download a large file. But I 
> can't figure out how to make this route handles calls in parallel.
>
> I've tried to read a different file on each request, tried with *mapAsync*, 
> with *mapAsyncUnordered* and with a custom dispatcher, but none of the 
> enumerated solutions works.
>
> Is there something I've missed?
>
> Here is the first version I've made:
>
> Http().bindAndHandle(Flow[HttpRequest].map { _ => HttpResponse(entity = 
> HttpEntity(ContentTypes.`application/octet-stream`, FileIO.fromPath(Paths.
> get("file" }, "localhost", 8080)
>
> Thanks for your help,
> Victor
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Parallel file download

2017-02-14 Thread Victor
Ok it seems it's my browser (Chrome) which can't handle parallel download. 
With *wget* it works.

Le lundi 13 février 2017 14:54:14 UTC+1, Victor a écrit :
>
> Hi,
>
> I've created a route in my application to download a large file. But I 
> can't figure out how to make this route handles calls in parallel.
>
> I've tried to read a different file on each request, tried with *mapAsync*, 
> with *mapAsyncUnordered* and with a custom dispatcher, but none of the 
> enumerated solutions works.
>
> Is there something I've missed?
>
> Here is the first version I've made:
>
> Http().bindAndHandle(Flow[HttpRequest].map { _ => HttpResponse(entity = 
> HttpEntity(ContentTypes.`application/octet-stream`, FileIO.fromPath(Paths.
> get("file" }, "localhost", 8080)
>
> Thanks for your help,
> Victor
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Detect Content-Type with a Source[ByteString]

2017-04-03 Thread Victor
Hi,

I've created the following service:

Encrypted file
  +
  | Uploaded to
  |
+-v-+
|   |
| Amazon S3 |
|   |
+---^---+---+ Receive resp and return
|   | 
HttpResponse(entity=resp.entity.dataBytes.via(decryptionFlow))
|   |   ++
|   +--->+-> Decrypted 
file with unknown mime type
|   | Decrypt & download service |
+---+<-+
Request encrypted file  ++  Request file


It works like a charm, with a JVM constrained to 128Mo of memory I can 
download file of many Go without any problem (because I forward the source 
of bytes).
But to be perfect, I would like to set the Content-Type header of the 
response containing the decrypted file, because now it's just binary data.

To achieve that goal I want to use Apache Tika (it's a library used to 
detect mime type). Apache Tika need some bytes to do its work.

So here is my problem, how can I peek some data from the decrypted bytes, 
detect the mime type, and when done, return the HttpResponse with the 
correct header and with the entity set with the Source of decrypted bytes?

val decryptedBytesSource = resp.entity.dataBytes.via(decryptionFlow)
val contentType =  detectContentType(decryptedBytesSource) 
HttpResponse(entity = HttpEntity(contentType, decryptedBytesSource))

I know the code above doesn't work as I can't consume the source twice, 
it's just to show what I want to do.

Thank you in advance :)
Victor

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Detect Content-Type with a Source[ByteString]

2017-04-05 Thread Victor
Hi,

I've managed to do something which works but I think there may be thread 
safety problem as I've created two graph stage logics which interfere with 
each other:

class MediaDetectorSink extends 
GraphStageWithMaterializedValue[SinkShape[ByteString], Future[(String, 
Source[ByteString, _])]] {

  val in = Inlet[ByteString]("MediaDetectorSink.in")

  override val shape = SinkShape(in)

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) 
= {
val promise = Promise[(String, Source[ByteString, _])]()

val logic = new GraphStageLogic(shape) with InHandler {

  var detectionDone = false

  var buffer = ByteString.empty

  val sourceStage = new GraphStage[SourceShape[ByteString]] {

val out = Outlet[ByteString]("MediaDetectorSource.out")

override val shape = SourceShape(out)

override def createLogic(inheritedAttributes: Attributes) = new 
GraphStageLogic(shape) with OutHandler {

  override def onPull() = {
if (buffer.nonEmpty) {
  push(out, buffer)
  buffer = ByteString.empty
} else if (canPull())
  pull(in)
else
  completeStage()
  }

  setHandler(out, this)

}

  }

  override def preStart() = pull(in)

  override def onPush() = {
if (detectionDone)
  push(sourceStage.out, grab(in))
else {
  buffer ++= grab(in)
  detect().fold {
pull(in)
  }{ mimeType =>
promise.success((mimeType, Source.fromGraph(sourceStage)))
completeStage()
  }
}
  }

  def detect() = {
detectionDone = true
// ...Apache Tika stuff here...
Some("my-mime-type")
  }

  def canPull() = !isClosed(in)

  setHandler(in, this)

}

(logic, promise.future)
  }

}


It's basically a sink which materialize a future containing the detected 
mime type and a source.
We run a source of `ByteString` through the sink, it pull enough 
`ByteString` to do its detection work, when the detection is done, it 
completes the future with the detected mime type and a source which will 
merge the bytes consumed from the original source (for the detection work) 
and the remaining bytes of the original source.

Could you give me some advice about that? I've read the source of 
`BroadcastHub` to get inspired and their use case is a bit different with 
mine. They materialize a source which can then materialized many times, 
whereas I materialize one source and then materialize it only one time. So 
do I have to bother with thread safety here?

Thank you,
Victor

Le lundi 3 avril 2017 10:00:30 UTC+2, Victor a écrit :
>
> Hi,
>
> I've created the following service:
>
> Encrypted file
>   +
>   | Uploaded to
>   |
> +-v-+
> |   |
> | Amazon S3 |
> |   |
> +---^---+---+ Receive resp and return
> |   | 
> HttpResponse(entity=resp.entity.dataBytes.via(decryptionFlow))
> |   |   ++
> |   +--->+-> Decrypted 
> file with unknown mime type
> |   | Decrypt & download service |
> +---+<-+
> Request encrypted file  ++  Request file
>
>
> It works like a charm, with a JVM constrained to 128Mo of memory I can 
> download file of many Go without any problem (because I forward the source 
> of bytes).
> But to be perfect, I would like to set the Content-Type header of the 
> response containing the decrypted file, because now it's just binary data.
>
> To achieve that goal I want to use Apache Tika (it's a library used to 
> detect mime type). Apache Tika need some bytes to do its work.
>
> So here is my problem, how can I peek some data from the decrypted bytes, 
> detect the mime type, and when done, return the HttpResponse with the 
> correct header and with the entity set with the Source of decrypted bytes?
>
> val decryptedBytesSource = resp.entity.dataBytes.via(decryptionFlow)
> val contentType =  detectContentType(decryptedBytesSource) 
> HttpResponse(entity = HttpEntity(contentType, decryptedBytesSource))
>
> I know the code above doesn't work as I can't consume the source twice, 
> it's just to show what I want to do.
>
> Thank you in advance :)
> Victor
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.a

[akka-user] Re: Detect Content-Type with a Source[ByteString]

2017-04-06 Thread Victor
Ahh I didn't know this operator ! Thank you very much :)

I did a test and it works (it's much less code):

def detect(data: ByteString) = {
  // ...Apache Tika stuff...
  Some(MediaType.custom("text/plain", binary = 
true).asInstanceOf[MediaType.Binary])
}


def detect(source: Source[ByteString, _], previousPrefix: ByteString = 
ByteString.empty): Future[(MediaType.Binary, Source[ByteString, _])] =
  source.prefixAndTail(1).runWith(Sink.head).flatMap { case (prefix, tail) =>
detect(prefix.head).fold {
  detect(tail, prefix.head)
}{ mediaType =>
  Future.successful((mediaType, tail.prepend(Source.single(previousPrefix 
++ prefix.head
}
  }


[...]


val route =
  path("test") {
get {
  complete("123456789\n")
}
  } ~
  (path("media") & parameter("url" ? "http://127.0.0.1:8080/test";)) { url =>
get {
  complete(Http().singleRequest(HttpRequest(uri = url)).flatMap(response => 
detect(response.entity.dataBytes)).map { case (mediaType, source) =>
HttpEntity(ContentType(mediaType), source)
  })
}
  }


With `prefixAndTail` I ask for only 1 `ByteString` and try to detect the 
mime-type with it, but if it's not enough, the `detect` method is executed 
until I can figure out the mime-type.

I did some benchmarks with `wrk`, here is the results:

$ wrk -t8 -c32 -d10s "http://127.0.0.1:8080/test";

Running 10s test @ http://127.0.0.1:8080/test

 8 threads and 32 connections

 Thread Stats   Avg  Stdev Max   +/- Stdev

   Latency 4.58ms   23.80ms 437.09ms   97.11%

   Req/Sec 4.89k 1.21k   11.45k75.73%

 386406 requests in 10.04s, 56.38MB read

Requests/sec:  38499.53

Transfer/sec:  5.62MB

$ wrk -t8 -c32 -d10s "http://127.0.0.1:8080/media";

Running 10s test @ http://127.0.0.1:8080/media

 8 threads and 32 connections

 Thread Stats   Avg  Stdev Max   +/- Stdev

   Latency15.24ms   26.91ms 338.60ms   92.50%

   Req/Sec   449.52 86.50   820.00 71.46%

 35837 requests in 10.04s, 5.33MB read

Requests/sec:   3571.02

Transfer/sec:   544.06KB

It's a naïve benchmark but it's just to give me an idea about the 
implementation.

Thank you again :)
Victor

Le mercredi 5 avril 2017 23:59:03 UTC+2, Kyrylo Stokoz a écrit :
>
> I'm not sure but maybe Source.prefixAndTail can help you to archive same 
> thing?
>
> You can run detection on prefix and later combine it with tail again via 
> source.concat?
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Detect Content-Type with a Source[ByteString]

2017-04-06 Thread Victor
For those interested, here is the final, fixed and working code:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpEntity, _}
import akka.http.scaladsl.server.Directives.{complete, _}
import akka.stream._
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import org.apache.tika.Tika

import scala.concurrent.Future

object Service extends App {

  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher

  val tika = new Tika()

  val minimumDetectionSize = 64 * 1024

  def detect(source: Source[ByteString, Any], previousPrefix: ByteString = 
ByteString.empty): Future[(MediaType.Binary, Source[ByteString, NotUsed])] =
source.prefixAndTail(1).runWith(Sink.head).flatMap { case (prefix, tail) =>
  prefix.headOption.fold {

Future.successful((MediaType.custom(tika.detect(previousPrefix.toArray), binary 
= true).asInstanceOf[MediaType.Binary], 
tail.prepend(Source.single(previousPrefix
  }{ prefix =>
val completePrefix = previousPrefix ++ prefix
if (completePrefix.size < minimumDetectionSize)
  detect(tail, completePrefix)
else
  
Future.successful((MediaType.custom(tika.detect(completePrefix.toArray), binary 
= true).asInstanceOf[MediaType.Binary], 
tail.prepend(Source.single(completePrefix
  }
}

  val route =
(path("media") & parameter("url")) { url =>
  get {
complete(Http().singleRequest(HttpRequest(uri = url)).flatMap(response 
=> detect(response.entity.dataBytes)).map { case (mediaType, source) =>
  HttpEntity(ContentType(mediaType), source)
})
  }
}

  Http().bindAndHandle(route, "127.0.0.1", 8080)

}


Have a good day,
Victor

Le jeudi 6 avril 2017 11:40:57 UTC+2, Victor a écrit :
>
> Ahh I didn't know this operator ! Thank you very much :)
>
> I did a test and it works (it's much less code):
>
> def detect(data: ByteString) = {
>   // ...Apache Tika stuff...
>   Some(MediaType.custom("text/plain", binary = 
> true).asInstanceOf[MediaType.Binary])
> }
>
>
> def detect(source: Source[ByteString, _], previousPrefix: ByteString = 
> ByteString.empty): Future[(MediaType.Binary, Source[ByteString, _])] =
>   source.prefixAndTail(1).runWith(Sink.head).flatMap { case (prefix, tail) =>
> detect(prefix.head).fold {
>   detect(tail, prefix.head)
> }{ mediaType =>
>   Future.successful((mediaType, tail.prepend(Source.single(previousPrefix 
> ++ prefix.head
> }
>   }
>
>
> [...]
>
>
> val route =
>   path("test") {
> get {
>   complete("123456789\n")
> }
>   } ~
>   (path("media") & parameter("url" ? "http://127.0.0.1:8080/test";)) { url =>
> get {
>   complete(Http().singleRequest(HttpRequest(uri = url)).flatMap(response 
> => detect(response.entity.dataBytes)).map { case (mediaType, source) =>
> HttpEntity(ContentType(mediaType), source)
>   })
> }
>   }
>
>
> With `prefixAndTail` I ask for only 1 `ByteString` and try to detect the 
> mime-type with it, but if it's not enough, the `detect` method is executed 
> until I can figure out the mime-type.
>
> I did some benchmarks with `wrk`, here is the results:
>
> $ wrk -t8 -c32 -d10s "http://127.0.0.1:8080/test";
>
> Running 10s test @ http://127.0.0.1:8080/test
>
>  8 threads and 32 connections
>
>  Thread Stats   Avg  Stdev Max   +/- Stdev
>
>Latency 4.58ms   23.80ms 437.09ms   97.11%
>
>Req/Sec 4.89k 1.21k   11.45k75.73%
>
>  386406 requests in 10.04s, 56.38MB read
>
> Requests/sec:  38499.53
>
> Transfer/sec:  5.62MB
>
> $ wrk -t8 -c32 -d10s "http://127.0.0.1:8080/media";
>
> Running 10s test @ http://127.0.0.1:8080/media
>
>  8 threads and 32 connections
>
>  Thread Stats   Avg  Stdev Max   +/- Stdev
>
>Latency15.24ms   26.91ms 338.60ms   92.50%
>
>Req/Sec   449.52 86.50   820.00 71.46%
>
>  35837 requests in 10.04s, 5.33MB read
>
> Requests/sec:   3571.02
>
> Transfer/sec:   544.06KB
>
> It's a naïve benchmark but it's just to give me an idea about the 
> implementation.
>
> Thank you again :)
> Victor
>
> Le mercredi 5 avril 2017 23:59:03 UTC+2, Kyrylo Stokoz a écrit :
>>
>> I'm not sure but maybe Source.prefixAndTail can help you to archive same 
>> thing?
>>
>> You can run detection on prefix and later combine it with tail again via 
>> source.concat?
>>
>

-- 
>

Re: [akka-user] Akka-camel perfomance fluctuations

2014-08-15 Thread Victor Munteanu
Sorry, unfortunately I couldn't improve on it and at some point gave up 
(had other important things to do). I have a colleague which did additional 
experimentation. I will ask him to contribute if he can.

On Tuesday, August 12, 2014 8:37:10 PM UTC+2, Ahmad Alkilani wrote:
>
> hi Victor, any updates on this?
>
> I'm dealing with a somewhat similar use case. I find that everything is 
> pointing to the camel-netty or camel-jetty component as being the 
> bottleneck and seems that the thread pool would most likely be the culprit 
> (no proof yet). I had similar findings to Patrik about the Akka dispatchers 
> not being the problem. I am not an expert on tuning the thread pool for 
> camel-netty or camel-jetty.
>
> Victor, were you able to figure out where the bottlenecks were?
>
> Thanks
>
> On Tuesday, April 16, 2013 8:24:04 PM UTC-7, Victor Munteanu wrote:
>>
>>  Thank you for your quick reply. I've already made the changes you've 
>> proposed. I will see if I can quick some camel parameters and I will post 
>> any findings here, they may be of use to someone.
>>
>> Best,
>> Victor
>>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] No message received on remote node in Akka Cluster with DistributedPubSub

2016-07-22 Thread Victor Ho
Hi,

I have a Akka cluster setup locally on one machine, with 2 nodes running on 
2 java processes participating into one cluster. The different java 
processes (nodes) listen on different TCP ports.

I am able to startup and see the nodes joining the cluster, however, when I 
have the "orchestrator" node "DistributedPubSubMediator.Send" a message to 
the "mediator" actor, the target remote actor never received the message.

Can anyone help give some pointers on how to trouble shoot such issue?

Attaching the akka application.conf. The highlights:
  actor.provider = "akka.cluster.ClusterActorRefProvider"

  remote {
 enabled-transports = ["akka.remote.netty.tcp"]
 netty.tcp {
   hostname = 127.0.0.1
   port = 9095
 }
  }

  extensions = ["akka.cluster.pubsub.DistributedPubSub"]

For the 2 nodes, one runs on port *9095*, and one runs on port *9080*

>From the log:
[INFO] [07/22/2016 13:42:40.024] [main] [akka.remote.Remoting] Starting 
remoting
[INFO] [07/22/2016 13:42:40.305] [main] [akka.remote.Remoting] Remoting 
started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:9095]
[INFO] [07/22/2016 13:42:40.306] [main] [akka.remote.Remoting] Remoting now 
listens on addresses: [akka.tcp://ClusterSystem@127.0.0.1:9095]
[INFO] [07/22/2016 13:42:40.317] [main] 
[akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node 
[akka.tcp://ClusterSystem@127.0.0.1:9095] - Starting up...
[INFO] [07/22/2016 13:42:40.390] [main] 
[akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node 
[akka.tcp://ClusterSystem@127.0.0.1:9095] - Registered cluster JMX MBean 
[akka:type=Cluster]
[INFO] [07/22/2016 13:42:40.390] [main] 
[akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node 
[akka.tcp://ClusterSystem@127.0.0.1:9095] - Started up successfully
[INFO] [07/22/2016 13:42:45.442] 
[ClusterSystem-akka.actor.default-dispatcher-15] 
[akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node 
[akka.tcp://ClusterSystem@127.0.0.1:9095] - Node 
[akka.tcp://ClusterSystem@127.0.0.1:9095] is JOINING, roles [fitting]
[INFO] [07/22/2016 13:42:46.426] 
[ClusterSystem-akka.actor.default-dispatcher-16] 
[akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node 
[akka.tcp://ClusterSystem@127.0.0.1:9095] - Leader is moving node 
*[akka.tcp://ClusterSystem@127.0.0.1:9095] 
to [Up]*
[INFO] [07/22/2016 13:42:49.604] 
[ClusterSystem-akka.actor.default-dispatcher-19] 
[akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node 
[akka.tcp://ClusterSystem@127.0.0.1:9095] - Node 
[akka.tcp://ClusterSystem@127.0.0.1:9080] is JOINING, roles [fitting]
[INFO] [07/22/2016 13:42:50.420] 
[ClusterSystem-akka.actor.default-dispatcher-14] 
[akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node 
[akka.tcp://ClusterSystem@127.0.0.1:9095] - Leader is moving node 
*[akka.tcp://ClusterSystem@127.0.0.1:9080] 
to [Up]*

Node membership in the cluster looks fine, but the message sending is not 
successful.

Code sending to remote actor with path "worker"

private ActorRef mediator = 
DistributedPubSub.get(getContext().system()).mediator();

boolean localAffinity = true;
mediator.tell(new DistributedPubSubMediator.Send("/user/*worker*", 
message, localAffinity), getSelf());

Code worker actor telling itself to mediator
actorSystem.actorOf(Props.create(FittingActor.class), "worker");

ActorRef mediator = 
DistributedPubSub.get(getContext().system()).mediator();
// register to the path
mediator.tell(new DistributedPubSubMediator.Put(getSelf()), 
getSelf());

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
akka {

  actor.provider = "akka.cluster.ClusterActorRefProvider"

  loglevel = INFO

  remote {
 enabled-transports = ["akka.remote.netty.tcp"]

 netty.tcp {
   hostname = 127.0.0.1
   port = 9095
 }

  log-remote-lifecycle-events = on
  log-sent-messages = on
  log-received-messages = on

  transport-failure-detector {
hartbeat-interval = 30s
  }
  }

  cluster {
roles = ["fitting"]
metrics.enabled = off
auto-down-unreachable-after = 120s
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:9095",
"akka.tcp://ClusterSystem@127.0.0.1:9080"
]

pub-sub {
# Actor name of the mediator actor, /system/distributedPubSubMediator
name = distributedPubSubMediator

# Start the mediator on member

[akka-user] Re: No message received on remote node in Akka Cluster with DistributedPubSub

2016-07-22 Thread Victor Ho
The issue is resolved when I make the "roles" in cluster conf to be empty 
instead of having the same value in both Java processes (node).

Before:
  cluster {
roles = ["fitting"]

After
  cluster {
roles = []

Can anyone give me some pointers on the meaning of roles and the proper 
usage in a cluster?

Thanks.


On Friday, July 22, 2016 at 5:35:16 PM UTC+8, Victor Ho wrote:
>
> Hi,
>
> I have a Akka cluster setup locally on one machine, with 2 nodes running 
> on 2 java processes participating into one cluster. The different java 
> processes (nodes) listen on different TCP ports.
>
> I am able to startup and see the nodes joining the cluster, however, when 
> I have the "orchestrator" node "DistributedPubSubMediator.Send" a message 
> to the "mediator" actor, the target remote actor never received the message.
>
> Can anyone help give some pointers on how to trouble shoot such issue?
>
> Attaching the akka application.conf. The highlights:
>   actor.provider = "akka.cluster.ClusterActorRefProvider"
>
>   remote {
>  enabled-transports = ["akka.remote.netty.tcp"]
>  netty.tcp {
>hostname = 127.0.0.1
>port = 9095
>  }
>   }
>
>   extensions = ["akka.cluster.pubsub.DistributedPubSub"]
>
> For the 2 nodes, one runs on port *9095*, and one runs on port *9080*
>
> From the log:
> [INFO] [07/22/2016 13:42:40.024] [main] [akka.remote.Remoting] Starting 
> remoting
> [INFO] [07/22/2016 13:42:40.305] [main] [akka.remote.Remoting] Remoting 
> started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:9095]
> [INFO] [07/22/2016 13:42:40.306] [main] [akka.remote.Remoting] Remoting 
> now listens on addresses: [akka.tcp://ClusterSystem@127.0.0.1:9095]
> [INFO] [07/22/2016 13:42:40.317] [main] 
> [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://
> ClusterSystem@127.0.0.1:9095] - Starting up...
> [INFO] [07/22/2016 13:42:40.390] [main] 
> [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://
> ClusterSystem@127.0.0.1:9095] - Registered cluster JMX MBean 
> [akka:type=Cluster]
> [INFO] [07/22/2016 13:42:40.390] [main] 
> [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://
> ClusterSystem@127.0.0.1:9095] - Started up successfully
> [INFO] [07/22/2016 13:42:45.442] 
> [ClusterSystem-akka.actor.default-dispatcher-15] 
> [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://
> ClusterSystem@127.0.0.1:9095] - Node [akka.tcp://
> ClusterSystem@127.0.0.1:9095] is JOINING, roles [fitting]
> [INFO] [07/22/2016 13:42:46.426] 
> [ClusterSystem-akka.actor.default-dispatcher-16] 
> [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://
> ClusterSystem@127.0.0.1:9095] - Leader is moving node 
> *[akka.tcp://ClusterSystem@127.0.0.1:9095 
> <http://ClusterSystem@127.0.0.1:9095>] to [Up]*
> [INFO] [07/22/2016 13:42:49.604] 
> [ClusterSystem-akka.actor.default-dispatcher-19] 
> [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://
> ClusterSystem@127.0.0.1:9095] - Node [akka.tcp://
> ClusterSystem@127.0.0.1:9080] is JOINING, roles [fitting]
> [INFO] [07/22/2016 13:42:50.420] 
> [ClusterSystem-akka.actor.default-dispatcher-14] 
> [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://
> ClusterSystem@127.0.0.1:9095] - Leader is moving node 
> *[akka.tcp://ClusterSystem@127.0.0.1:9080 
> <http://ClusterSystem@127.0.0.1:9080>] to [Up]*
>
> Node membership in the cluster looks fine, but the message sending is not 
> successful.
>
> Code sending to remote actor with path "worker"
>
> private ActorRef mediator = 
> DistributedPubSub.get(getContext().system()).mediator();
>
> boolean localAffinity = true;
> mediator.tell(new DistributedPubSubMediator.Send("/user/
> *worker*", message, localAffinity), getSelf());
>
> Code worker actor telling itself to mediator
> actorSystem.actorOf(Props.create(FittingActor.class), "worker");
>
> ActorRef mediator = 
> DistributedPubSub.get(getContext().system()).mediator();
> // register to the path
> mediator.tell(new DistributedPubSubMediator.Put(getSelf()), 
> getSelf());
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Source.fromFutureSource. Error handling and return type signature

2017-09-11 Thread Victor Caballero
Hello,

I am intrigued by the mechanisms in which errors/exceptions in the Future 
wrapped by Source.fromFutureSource are handled or if they can even be 
handled. Also, the type signature of the method is: 

def fromFutureSource[T, M](future: Future[Graph[SourceShape[T], M]]): Source[T, 
Future[M]]

I don't understand the return type: Source[T, Future[M]]. Can anyone 
explain why is the Future here? What is the use of the second type 
parameter of Source[T, M]? Which is usually NotUsed?

Thank you!!

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Source.fromFutureSource. Error handling and return type signature

2017-09-12 Thread Victor Caballero
Hey,

I have skimmed the documentation you provided and I think this is what I 
was looking for. I will try to understand the whole picture at weekend.

Thank you!


El lunes, 11 de septiembre de 2017, 18:35:35 (UTC+2), Konrad Malawski 
escribió:
>
> Hi there Victor,
> That’s the “materialized value” type parameter: read the docs to learn 
> more about it:
> - 
> http://doc.akka.io/docs/akka/current/scala/stream/stream-flows-and-basics.html#defining-and-running-streams
> - 
> http://doc.akka.io/docs/akka/current/scala/stream/stream-composition.html#materialized-values
>
> Basically it’s a well-typed mechanism for stream elements to expose 
> control or information mechanisms.
> Read also the scaladoc of the operation you just posted, it explains what 
> the value is in this case.
>
> Happy hakking
>
> —
> Konrad `kto.so` Malawski
> Akka <http://akka.io> @ Lightbend <http://lightbend.com>
>
> On 11 September 2017 at 18:33:01, Victor Caballero (vicab...@gmail.com 
> ) wrote:
>
> Hello,
>
> I am intrigued by the mechanisms in which errors/exceptions in the Future 
> wrapped by Source.fromFutureSource are handled or if they can even be 
> handled. Also, the type signature of the method is:
>
> def fromFutureSource[T, M](future: Future[Graph[SourceShape[T], M]]): 
> Source[T, Future[M]]
>
> I don't understand the return type: Source[T, Future[M]]. Can anyone 
> explain why is the Future here? What is the use of the second type 
> parameter of Source[T, M]? Which is usually NotUsed?
>
> Thank you!!
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.