Re: [akka-user] How would you "connect" many independent graphs maintaining backpressure between them?

2016-07-22 Thread Akka Team
Hi Jack,

Queue.offer returns a Future, you can use that in a mapAsync in the stream
you want to transitively throttle. As you noted, this can fail though, if
the queue is full. What I would do is probably just use Future composition
(recoverWith in this case) to implement a retry logic (and probably add
some delay) with a bound (like 2-3 retires). This have many benefits

- due to how mapAsync works, it will attempt to backpressure individual
clients according to the global rate (the offer Future).
 - it does have a retry, but it has an upper bound on how many times it
retries and after it fails/cancels the client connection
 - there are no unbounded queues

-Endre

On Wed, Jul 13, 2016 at 2:16 PM, Viktor Klang 
wrote:

> Ah, sorry, I should've looked more carefully. AFAICT you'll need to figure
> out what throttling means for a fan-in operation.
>
> (Because now you're adding an unbounded queue (the CLQ))
>
> On Sun, Jul 10, 2016 at 4:31 PM, Jack Daniels  wrote:
>
>> Hi! I use it in the example  provided above. The problem is how to
>> throttle other graphs that are not connected to throttled graph.
>>
>> On Friday, July 8, 2016 at 2:24:04 PM UTC+3, √ wrote:
>>>
>>> Considered Flow.throttle?
>>>
>>> On Fri, Jul 8, 2016 at 11:23 AM, Jack Daniels 
>>> wrote:
>>>
 up vote
 down votefavorite
 

 Hey guys! I continue learning akka-streams and have new question
 .

 *Variables:*

- Single http client flow with throttling
- Multiple other flows that want to use first flow simultaneously

 *Goal:*

 Single http flow is flow that makes requests to particular API that
 limits number of calls to it. Otherwise it bans me. Thus it's very
 important to maintain rate of request regardless of how many clients in my
 code use it.

 There are number of other flows that want to make requests to mentioned
 API but I'd like to have backpressure from http flow. Normally you connect
 whole thing to one graph and it works. But it my case I have multiple
 graphs.

 How would you solve it ?

 *My attempt to solve it:*

 I use Source.queue for http flow so that I can queue http requests and
 have throttling. Problem is that Future from SourceQueue.offer fails
 if I exceed number of requests. Thus somehow I need to "reoffer" when
 previously offered event completes. Thus modified Future from
 SourceQueue would backpressure other graphs (inside their mapAsync)
 that make http requests.

 Here is how I implemented it

 object Main {

   implicit val system = ActorSystem("root")
   implicit val executor = system.dispatcher
   implicit val materializer = ActorMaterializer()

   private val queueHttp = Source.queue[(String, Promise[String])](2, 
 OverflowStrategy.backpressure)
 .throttle(1, FiniteDuration(1000, MILLISECONDS), 1, 
 ThrottleMode.Shaping)
 .mapAsync(4) {
   case (text, promise) =>
 // Simulate delay of http request
 val delay = (Random.nextDouble() * 1000 / 2).toLong
 Thread.sleep(delay)
 Future.successful(text -> promise)
 }
 .toMat(Sink.foreach({
   case (text, p) =>
 p.success(text)
 }))(Keep.left)
 .run

   val futureDeque = new ConcurrentLinkedDeque[Future[String]]()

   def sendRequest(value: String): Future[String] = {

 val p = Promise[String]()
 val offerFuture = queueHttp.offer(value -> p)

 def addToQueue(future: Future[String]): Future[String] = {
   futureDeque.addLast(future)
   future.onComplete {
 case _ => futureDeque.remove(future)
   }
   future
 }

 offerFuture.flatMap {
   case QueueOfferResult.Enqueued =>
 addToQueue(p.future)
 }.recoverWith {
   case ex =>
 val first = futureDeque.pollFirst()
 if (first != null)
   addToQueue(first.flatMap(_ => sendRequest(value)))
 else
   sendRequest(value)
 }
   }

   def main(args: Array[String]) {

 val allFutures = for (v <- 0 until 15)
   yield {
 val res = sendRequest(s"Text $v")
 res.onSuccess {
   case text =>
 println("> " + text)
 }
 res
   }

 Future.sequence(allFutures).onComplete {
   case Success(text) =>
 println(s">>> TOTAL: ${text.length} [in queue: 
 ${futureDeque.size()}]")
 system.terminate()
   case Failure(ex) =>
 

Re: [akka-user] How would you "connect" many independent graphs maintaining backpressure between them?

2016-07-13 Thread Viktor Klang
Ah, sorry, I should've looked more carefully. AFAICT you'll need to figure
out what throttling means for a fan-in operation.

(Because now you're adding an unbounded queue (the CLQ))

On Sun, Jul 10, 2016 at 4:31 PM, Jack Daniels  wrote:

> Hi! I use it in the example  provided above. The problem is how to
> throttle other graphs that are not connected to throttled graph.
>
> On Friday, July 8, 2016 at 2:24:04 PM UTC+3, √ wrote:
>>
>> Considered Flow.throttle?
>>
>> On Fri, Jul 8, 2016 at 11:23 AM, Jack Daniels 
>> wrote:
>>
>>> up vote
>>> down votefavorite
>>> 
>>>
>>> Hey guys! I continue learning akka-streams and have new question
>>> .
>>>
>>> *Variables:*
>>>
>>>- Single http client flow with throttling
>>>- Multiple other flows that want to use first flow simultaneously
>>>
>>> *Goal:*
>>>
>>> Single http flow is flow that makes requests to particular API that
>>> limits number of calls to it. Otherwise it bans me. Thus it's very
>>> important to maintain rate of request regardless of how many clients in my
>>> code use it.
>>>
>>> There are number of other flows that want to make requests to mentioned
>>> API but I'd like to have backpressure from http flow. Normally you connect
>>> whole thing to one graph and it works. But it my case I have multiple
>>> graphs.
>>>
>>> How would you solve it ?
>>>
>>> *My attempt to solve it:*
>>>
>>> I use Source.queue for http flow so that I can queue http requests and
>>> have throttling. Problem is that Future from SourceQueue.offer fails if
>>> I exceed number of requests. Thus somehow I need to "reoffer" when
>>> previously offered event completes. Thus modified Future from
>>> SourceQueue would backpressure other graphs (inside their mapAsync)
>>> that make http requests.
>>>
>>> Here is how I implemented it
>>>
>>> object Main {
>>>
>>>   implicit val system = ActorSystem("root")
>>>   implicit val executor = system.dispatcher
>>>   implicit val materializer = ActorMaterializer()
>>>
>>>   private val queueHttp = Source.queue[(String, Promise[String])](2, 
>>> OverflowStrategy.backpressure)
>>> .throttle(1, FiniteDuration(1000, MILLISECONDS), 1, 
>>> ThrottleMode.Shaping)
>>> .mapAsync(4) {
>>>   case (text, promise) =>
>>> // Simulate delay of http request
>>> val delay = (Random.nextDouble() * 1000 / 2).toLong
>>> Thread.sleep(delay)
>>> Future.successful(text -> promise)
>>> }
>>> .toMat(Sink.foreach({
>>>   case (text, p) =>
>>> p.success(text)
>>> }))(Keep.left)
>>> .run
>>>
>>>   val futureDeque = new ConcurrentLinkedDeque[Future[String]]()
>>>
>>>   def sendRequest(value: String): Future[String] = {
>>>
>>> val p = Promise[String]()
>>> val offerFuture = queueHttp.offer(value -> p)
>>>
>>> def addToQueue(future: Future[String]): Future[String] = {
>>>   futureDeque.addLast(future)
>>>   future.onComplete {
>>> case _ => futureDeque.remove(future)
>>>   }
>>>   future
>>> }
>>>
>>> offerFuture.flatMap {
>>>   case QueueOfferResult.Enqueued =>
>>> addToQueue(p.future)
>>> }.recoverWith {
>>>   case ex =>
>>> val first = futureDeque.pollFirst()
>>> if (first != null)
>>>   addToQueue(first.flatMap(_ => sendRequest(value)))
>>> else
>>>   sendRequest(value)
>>> }
>>>   }
>>>
>>>   def main(args: Array[String]) {
>>>
>>> val allFutures = for (v <- 0 until 15)
>>>   yield {
>>> val res = sendRequest(s"Text $v")
>>> res.onSuccess {
>>>   case text =>
>>> println("> " + text)
>>> }
>>> res
>>>   }
>>>
>>> Future.sequence(allFutures).onComplete {
>>>   case Success(text) =>
>>> println(s">>> TOTAL: ${text.length} [in queue: 
>>> ${futureDeque.size()}]")
>>> system.terminate()
>>>   case Failure(ex) =>
>>> ex.printStackTrace()
>>> system.terminate()
>>> }
>>>
>>> Await.result(system.whenTerminated, Duration.Inf)
>>>   }}
>>>
>>> Disadvantage of this solution is that I have locking on
>>> ConcurrentLinkedDeque which is probably not that bad for rate of 1
>>> request per second but still.
>>>
>>> How would you solve this task?
>>>
>>> --
>>> >> Read the docs: http://akka.io/docs/
>>> >> Check the FAQ:
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >> Search the archives:
>>> https://groups.google.com/group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to akka-user+...@googlegroups.com.
>>> To post to this group, send email to 

Re: [akka-user] How would you "connect" many independent graphs maintaining backpressure between them?

2016-07-10 Thread Jack Daniels
Hi! I use it in the example  provided above. The problem is how to throttle 
other graphs that are not connected to throttled graph.

On Friday, July 8, 2016 at 2:24:04 PM UTC+3, √ wrote:
>
> Considered Flow.throttle?
>
> On Fri, Jul 8, 2016 at 11:23 AM, Jack Daniels  > wrote:
>
>> up vote
>> down votefavorite 
>> 
>>
>> Hey guys! I continue learning akka-streams and have new question 
>> .
>>
>> *Variables:*
>>
>>- Single http client flow with throttling
>>- Multiple other flows that want to use first flow simultaneously
>>
>> *Goal:*
>>
>> Single http flow is flow that makes requests to particular API that 
>> limits number of calls to it. Otherwise it bans me. Thus it's very 
>> important to maintain rate of request regardless of how many clients in my 
>> code use it.
>>
>> There are number of other flows that want to make requests to mentioned 
>> API but I'd like to have backpressure from http flow. Normally you connect 
>> whole thing to one graph and it works. But it my case I have multiple 
>> graphs.
>>
>> How would you solve it ?
>>
>> *My attempt to solve it:*
>>
>> I use Source.queue for http flow so that I can queue http requests and 
>> have throttling. Problem is that Future from SourceQueue.offer fails if 
>> I exceed number of requests. Thus somehow I need to "reoffer" when 
>> previously offered event completes. Thus modified Future from SourceQueue 
>> would 
>> backpressure other graphs (inside their mapAsync) that make http 
>> requests.
>>
>> Here is how I implemented it
>>
>> object Main {
>>
>>   implicit val system = ActorSystem("root")
>>   implicit val executor = system.dispatcher
>>   implicit val materializer = ActorMaterializer()
>>
>>   private val queueHttp = Source.queue[(String, Promise[String])](2, 
>> OverflowStrategy.backpressure)
>> .throttle(1, FiniteDuration(1000, MILLISECONDS), 1, ThrottleMode.Shaping)
>> .mapAsync(4) {
>>   case (text, promise) =>
>> // Simulate delay of http request
>> val delay = (Random.nextDouble() * 1000 / 2).toLong
>> Thread.sleep(delay)
>> Future.successful(text -> promise)
>> }
>> .toMat(Sink.foreach({
>>   case (text, p) =>
>> p.success(text)
>> }))(Keep.left)
>> .run
>>
>>   val futureDeque = new ConcurrentLinkedDeque[Future[String]]()
>>
>>   def sendRequest(value: String): Future[String] = {
>>
>> val p = Promise[String]()
>> val offerFuture = queueHttp.offer(value -> p)
>>
>> def addToQueue(future: Future[String]): Future[String] = {
>>   futureDeque.addLast(future)
>>   future.onComplete {
>> case _ => futureDeque.remove(future)
>>   }
>>   future
>> }
>>
>> offerFuture.flatMap {
>>   case QueueOfferResult.Enqueued =>
>> addToQueue(p.future)
>> }.recoverWith {
>>   case ex =>
>> val first = futureDeque.pollFirst()
>> if (first != null)
>>   addToQueue(first.flatMap(_ => sendRequest(value)))
>> else
>>   sendRequest(value)
>> }
>>   }
>>
>>   def main(args: Array[String]) {
>>
>> val allFutures = for (v <- 0 until 15)
>>   yield {
>> val res = sendRequest(s"Text $v")
>> res.onSuccess {
>>   case text =>
>> println("> " + text)
>> }
>> res
>>   }
>>
>> Future.sequence(allFutures).onComplete {
>>   case Success(text) =>
>> println(s">>> TOTAL: ${text.length} [in queue: 
>> ${futureDeque.size()}]")
>> system.terminate()
>>   case Failure(ex) =>
>> ex.printStackTrace()
>> system.terminate()
>> }
>>
>> Await.result(system.whenTerminated, Duration.Inf)
>>   }}
>>
>> Disadvantage of this solution is that I have locking on 
>> ConcurrentLinkedDeque which is probably not that bad for rate of 1 
>> request per second but still.
>>
>> How would you solve this task?
>>
>> -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
> Cheers,
> √
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: 

Re: [akka-user] How would you "connect" many independent graphs maintaining backpressure between them?

2016-07-08 Thread Viktor Klang
Considered Flow.throttle?

On Fri, Jul 8, 2016 at 11:23 AM, Jack Daniels  wrote:

> up vote
> down votefavorite
> 
>
> Hey guys! I continue learning akka-streams and have new question
> .
>
> *Variables:*
>
>- Single http client flow with throttling
>- Multiple other flows that want to use first flow simultaneously
>
> *Goal:*
>
> Single http flow is flow that makes requests to particular API that limits
> number of calls to it. Otherwise it bans me. Thus it's very important to
> maintain rate of request regardless of how many clients in my code use it.
>
> There are number of other flows that want to make requests to mentioned
> API but I'd like to have backpressure from http flow. Normally you connect
> whole thing to one graph and it works. But it my case I have multiple
> graphs.
>
> How would you solve it ?
>
> *My attempt to solve it:*
>
> I use Source.queue for http flow so that I can queue http requests and
> have throttling. Problem is that Future from SourceQueue.offer fails if I
> exceed number of requests. Thus somehow I need to "reoffer" when previously
> offered event completes. Thus modified Future from SourceQueue would
> backpressure other graphs (inside their mapAsync) that make http requests.
>
> Here is how I implemented it
>
> object Main {
>
>   implicit val system = ActorSystem("root")
>   implicit val executor = system.dispatcher
>   implicit val materializer = ActorMaterializer()
>
>   private val queueHttp = Source.queue[(String, Promise[String])](2, 
> OverflowStrategy.backpressure)
> .throttle(1, FiniteDuration(1000, MILLISECONDS), 1, ThrottleMode.Shaping)
> .mapAsync(4) {
>   case (text, promise) =>
> // Simulate delay of http request
> val delay = (Random.nextDouble() * 1000 / 2).toLong
> Thread.sleep(delay)
> Future.successful(text -> promise)
> }
> .toMat(Sink.foreach({
>   case (text, p) =>
> p.success(text)
> }))(Keep.left)
> .run
>
>   val futureDeque = new ConcurrentLinkedDeque[Future[String]]()
>
>   def sendRequest(value: String): Future[String] = {
>
> val p = Promise[String]()
> val offerFuture = queueHttp.offer(value -> p)
>
> def addToQueue(future: Future[String]): Future[String] = {
>   futureDeque.addLast(future)
>   future.onComplete {
> case _ => futureDeque.remove(future)
>   }
>   future
> }
>
> offerFuture.flatMap {
>   case QueueOfferResult.Enqueued =>
> addToQueue(p.future)
> }.recoverWith {
>   case ex =>
> val first = futureDeque.pollFirst()
> if (first != null)
>   addToQueue(first.flatMap(_ => sendRequest(value)))
> else
>   sendRequest(value)
> }
>   }
>
>   def main(args: Array[String]) {
>
> val allFutures = for (v <- 0 until 15)
>   yield {
> val res = sendRequest(s"Text $v")
> res.onSuccess {
>   case text =>
> println("> " + text)
> }
> res
>   }
>
> Future.sequence(allFutures).onComplete {
>   case Success(text) =>
> println(s">>> TOTAL: ${text.length} [in queue: 
> ${futureDeque.size()}]")
> system.terminate()
>   case Failure(ex) =>
> ex.printStackTrace()
> system.terminate()
> }
>
> Await.result(system.whenTerminated, Duration.Inf)
>   }}
>
> Disadvantage of this solution is that I have locking on
> ConcurrentLinkedDeque which is probably not that bad for rate of 1
> request per second but still.
>
> How would you solve this task?
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
√

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at