Re: [akka-user] Recording latency of operations that return a Future inside an akka stream

2015-01-01 Thread Soumya Simanta
Endre, 

Thank you for responding. 
I was trying to do something like this. However, I'm not sure if what you 
have mentioned above will work because the Redis set API returns a 
Future[Boolean]. So in the map after my mapAsyncUnordered I've no reference 
to the operation that finished before that unless the Redis returns a 
reference in the response to each call. Please let me know if this is not 
correct. 

However, if I use mapAsync the order should be maintained correct ? Do you 
have any idea of how much performance difference is between mapAsync and 
mapAsyncUnordered ? I can maintain two lists (before and after). I'm yet to 
try the other approach that Konard pointed. 

What I'm learning is - doing things asynchronously is fast and fun BUT 
requires a different mindset :-) 

Thanks. 
-Soumya





On Thursday, January 1, 2015 3:48:54 AM UTC-5, Akka Team wrote:

 Hi,

 One pitfall in your simple map approach is that you might measure the 
 wrong values. Remember that mapAsyncUnordered is, well, unordered and 
 batching, so you cannot expect that the elements come out in the same order 
 as they entered. On approach would be to record the start time of 
 elements in a concurrent Map in a map stage in front of the redis write, 
 then record the end time of elements in a map stage after, reading from 
 the Map of start times. 

 The drawback of the above approach is that on top of what you want to 
 measure, you get overhead from:

  - using a concurrent map
  - adding two map stages

 The measurement pseudocode can look like this:

 .map{ id = putInMap(id, currentTime); id }
 .mapAsync(...)
 .map { id = currentTime - getFromMap(id) }
 .fold(0.0)(_ + _)
 .map(_ / NumberOfElements)
 // Now you have a Future of the time average.

 Of course you can also just collect the measurements in a Seq in the fold 
 instead of averaging, and then you can do whatever analysis you want after.

 -Endre

 On Sun, Dec 28, 2014 at 2:50 AM, Soumya Simanta soumya@gmail.com 
 javascript: wrote:

 Konrad, 

 Thank you for pointing me the correct direction. I'll give it a shot.

 -Soumya


 On Saturday, December 27, 2014 7:48:30 AM UTC-5, Konrad Malawski wrote:

 Hi Soumya,

 I don’t think what you’ll end up measuring this way will be very useful. 
 I mean, between the completion of the future and the triggering of the map 
 there are multiple asynchronous boundaries… So you won’t be measuring how 
 fast the set operation was, but how much time was between these 
 asynchronous boundaries - which could have been backpressured by the way.


 I suggest directly wrapping the set call with your measurement logic 
 instead - since that is what you want to measure it seems.


 By the way, we do have a “timed” element, in our extras section: 
 https://github.com/akka/akka/blob/release-2.3-dev/akka-
 stream/src/main/scala/akka/stream/extra/Timed.scala You can `import 
 Timed._` and then use it as shown here: https://github.com/akka/akka/
 blob/release-2.3-dev/akka-stream-tests/src/test/scala/akka/stream/extra/
 FlowTimedSpec.scala

 It’s a rather old element and I’m not sure if we’ll be keeping it, but 
 you can use it as a source of inspiration in case you end up needing that 
 kind of measurement.




 On 26 December 2014 at 05:46:55, Soumya Simanta (soumya@gmail.com) 
 wrote:
   
  This is related to this 
 https://groups.google.com/forum/#!topic/akka-user/NrSkEwMrS3s thread 
 but sufficiently different that I decided to create new thread. Hope that's 
 okay. 

  I would like to create a histogram of latency of a large number of set 
 operations 
 ( set returns a Future[Boolean]) using LatencyUtils 
 https://github.com/LatencyUtils/LatencyUtils 

  Basically I need to start recording the time before the set operation 
 (inside mapAsyncUnordered(k = redis.set(k + rnd, message))) and then 
 somehow record the end time in a map operation( .map( //record the end 
 time here) after this. I'm having a hard time trying to figure this 
 out. 

 My understanding is that the even though the mapAsyncUnordered doesn't 
 maintain the order of operations the map following the mapAsynUnordered 
 will 
 maintain the order from the previous stage because of TCP maintaining the 
 order. Is this correct? 


  val redis = RedisClient(localhost)

 val random1 = UUID.randomUUID().toString

  def insertValues(rnd: String): Flow[Int, Boolean] = {
 Flow[Int].mapAsyncUnordered(k = redis.set(k + rnd, message)).map( 
 //record the end time here)
 }
  
 val blackhole = BlackholeSink

   val maxSeq = 500
 val seqSource = Source( () = (1 to maxSeq).iterator )
  *val streamWithSeqSource = 
 seqSource.via(insertValues(random1)).runWith(blackhole)*

  
  --
  Read the docs: http://akka.io/docs/
  Check the FAQ: http://doc.akka.io/docs/akka/
 current/additional/faq.html
  Search the archives: https://groups.google.com/
 group/akka-user
 ---
 You received this message because you are subscribed to the Google 
 Groups Akka User List group.
 To 

Re: [akka-user] Recording latency of operations that return a Future inside an akka stream

2015-01-01 Thread Akka Team
Hi,

One pitfall in your simple map approach is that you might measure the
wrong values. Remember that mapAsyncUnordered is, well, unordered and
batching, so you cannot expect that the elements come out in the same order
as they entered. On approach would be to record the start time of
elements in a concurrent Map in a map stage in front of the redis write,
then record the end time of elements in a map stage after, reading from
the Map of start times.

The drawback of the above approach is that on top of what you want to
measure, you get overhead from:

 - using a concurrent map
 - adding two map stages

The measurement pseudocode can look like this:

.map{ id = putInMap(id, currentTime); id }
.mapAsync(...)
.map { id = currentTime - getFromMap(id) }
.fold(0.0)(_ + _)
.map(_ / NumberOfElements)
// Now you have a Future of the time average.

Of course you can also just collect the measurements in a Seq in the fold
instead of averaging, and then you can do whatever analysis you want after.

-Endre

On Sun, Dec 28, 2014 at 2:50 AM, Soumya Simanta soumya.sima...@gmail.com
wrote:

 Konrad,

 Thank you for pointing me the correct direction. I'll give it a shot.

 -Soumya


 On Saturday, December 27, 2014 7:48:30 AM UTC-5, Konrad Malawski wrote:

 Hi Soumya,

 I don’t think what you’ll end up measuring this way will be very useful.
 I mean, between the completion of the future and the triggering of the map
 there are multiple asynchronous boundaries… So you won’t be measuring how
 fast the set operation was, but how much time was between these
 asynchronous boundaries - which could have been backpressured by the way.


 I suggest directly wrapping the set call with your measurement logic
 instead - since that is what you want to measure it seems.


 By the way, we do have a “timed” element, in our extras section:
 https://github.com/akka/akka/blob/release-2.3-dev/akka-
 stream/src/main/scala/akka/stream/extra/Timed.scala You can `import
 Timed._` and then use it as shown here: https://github.com/akka/akka/
 blob/release-2.3-dev/akka-stream-tests/src/test/scala/akka/stream/extra/
 FlowTimedSpec.scala

 It’s a rather old element and I’m not sure if we’ll be keeping it, but
 you can use it as a source of inspiration in case you end up needing that
 kind of measurement.




 On 26 December 2014 at 05:46:55, Soumya Simanta (soumya@gmail.com)
 wrote:

  This is related to this
 https://groups.google.com/forum/#!topic/akka-user/NrSkEwMrS3s thread
 but sufficiently different that I decided to create new thread. Hope that's
 okay.

  I would like to create a histogram of latency of a large number of set 
 operations
 ( set returns a Future[Boolean]) using LatencyUtils
 https://github.com/LatencyUtils/LatencyUtils

  Basically I need to start recording the time before the set operation
 (inside mapAsyncUnordered(k = redis.set(k + rnd, message))) and then
 somehow record the end time in a map operation( .map( //record the end
 time here) after this. I'm having a hard time trying to figure this out.

 My understanding is that the even though the mapAsyncUnordered doesn't
 maintain the order of operations the map following the mapAsynUnordered will
 maintain the order from the previous stage because of TCP maintaining the
 order. Is this correct?


  val redis = RedisClient(localhost)

 val random1 = UUID.randomUUID().toString

  def insertValues(rnd: String): Flow[Int, Boolean] = {
 Flow[Int].mapAsyncUnordered(k = redis.set(k + rnd, message)).map(
 //record the end time here)
 }

 val blackhole = BlackholeSink

   val maxSeq = 500
 val seqSource = Source( () = (1 to maxSeq).iterator )
  *val streamWithSeqSource =
 seqSource.via(insertValues(random1)).runWith(blackhole)*


  --
  Read the docs: http://akka.io/docs/
  Check the FAQ: http://doc.akka.io/docs/akka/
 current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
 ---
 You received this message because you are subscribed to the Google Groups
 Akka User List group.
 To unsubscribe from this group and stop receiving emails from it, send an
 email to akka-user+...@googlegroups.com.
 To post to this group, send email to akka...@googlegroups.com.
 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.

  --
  Read the docs: http://akka.io/docs/
  Check the FAQ:
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
 ---
 You received this message because you are subscribed to the Google Groups
 Akka User List group.
 To unsubscribe from this group and stop receiving emails from it, send an
 email to akka-user+unsubscr...@googlegroups.com.
 To post to this group, send email to akka-user@googlegroups.com.
 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.




-- 
Akka Team
Typesafe - The software stack for applications that 

Re: [akka-user] Recording latency of operations that return a Future inside an akka stream

2015-01-01 Thread Akka Team
Hi,



On Thu, Jan 1, 2015 at 6:21 PM, Soumya Simanta soumya.sima...@gmail.com
wrote:

 Endre,

 Thank you for responding.
 I was trying to do something like this. However, I'm not sure if what you
 have mentioned above will work because the Redis set API returns a
 Future[Boolean]. So in the map after my mapAsyncUnordered I've no
 reference to the operation that finished before that unless the Redis
 returns a reference in the response to each call.


You can always transform that future to return a Future[Id] easily.


 Please let me know if this is not correct.

 However, if I use mapAsync the order should be maintained correct ?


Yes it does.


 Do you have any idea of how much performance difference is between
 mapAsync and mapAsyncUnordered ?


It depends on the latency distribution of the actual async call. The
ordered one suffers more from stragglers (unexpectedly long calls) since
every later result must wait, while the unordered is largely unaffected of
this. If the timing distribution of the call has a low deviation then I
expect not much difference between the two.


 I can maintain two lists (before and after). I'm yet to try the other
 approach that Konard pointed.

 What I'm learning is - doing things asynchronously is fast and fun BUT
 requires a different mindset :-)


This recognition is the most important step to become a powerful Akka
programmer :)

-Endre


 Thanks.
 -Soumya





 On Thursday, January 1, 2015 3:48:54 AM UTC-5, Akka Team wrote:

 Hi,

 One pitfall in your simple map approach is that you might measure the
 wrong values. Remember that mapAsyncUnordered is, well, unordered and
 batching, so you cannot expect that the elements come out in the same order
 as they entered. On approach would be to record the start time of
 elements in a concurrent Map in a map stage in front of the redis write,
 then record the end time of elements in a map stage after, reading from
 the Map of start times.

 The drawback of the above approach is that on top of what you want to
 measure, you get overhead from:

  - using a concurrent map
  - adding two map stages

 The measurement pseudocode can look like this:

 .map{ id = putInMap(id, currentTime); id }
 .mapAsync(...)
 .map { id = currentTime - getFromMap(id) }
 .fold(0.0)(_ + _)
 .map(_ / NumberOfElements)
 // Now you have a Future of the time average.

 Of course you can also just collect the measurements in a Seq in the fold
 instead of averaging, and then you can do whatever analysis you want after.

 -Endre

 On Sun, Dec 28, 2014 at 2:50 AM, Soumya Simanta soumya@gmail.com
 wrote:

 Konrad,

 Thank you for pointing me the correct direction. I'll give it a shot.

 -Soumya


 On Saturday, December 27, 2014 7:48:30 AM UTC-5, Konrad Malawski wrote:

 Hi Soumya,

 I don’t think what you’ll end up measuring this way will be very
 useful. I mean, between the completion of the future and the triggering of
 the map there are multiple asynchronous boundaries… So you won’t be
 measuring how fast the set operation was, but how much time was between
 these asynchronous boundaries - which could have been backpressured by the
 way.


 I suggest directly wrapping the set call with your measurement logic
 instead - since that is what you want to measure it seems.


 By the way, we do have a “timed” element, in our extras section:
 https://github.com/akka/akka/blob/release-2.3-dev/akka-strea
 m/src/main/scala/akka/stream/extra/Timed.scala You can `import
 Timed._` and then use it as shown here: https://github.com/akka/akka/b
 lob/release-2.3-dev/akka-stream-tests/src/test/scala/akka/stream/extra/
 FlowTimedSpec.scala

 It’s a rather old element and I’m not sure if we’ll be keeping it, but
 you can use it as a source of inspiration in case you end up needing that
 kind of measurement.




 On 26 December 2014 at 05:46:55, Soumya Simanta (soumya@gmail.com)
 wrote:

  This is related to this
 https://groups.google.com/forum/#!topic/akka-user/NrSkEwMrS3s thread
 but sufficiently different that I decided to create new thread. Hope that's
 okay.

  I would like to create a histogram of latency of a large number of set 
 operations
 ( set returns a Future[Boolean]) using LatencyUtils
 https://github.com/LatencyUtils/LatencyUtils

  Basically I need to start recording the time before the set operation
 (inside mapAsyncUnordered(k = redis.set(k + rnd, message))) and then
 somehow record the end time in a map operation( .map( //record the end
 time here) after this. I'm having a hard time trying to figure this
 out.

 My understanding is that the even though the mapAsyncUnordered doesn't
 maintain the order of operations the map following the mapAsynUnordered 
 will
 maintain the order from the previous stage because of TCP maintaining the
 order. Is this correct?


  val redis = RedisClient(localhost)

 val random1 = UUID.randomUUID().toString

  def insertValues(rnd: String): Flow[Int, Boolean] = {
 Flow[Int].mapAsyncUnordered(k = redis.set(k + 

Re: [akka-user] Recording latency of operations that return a Future inside an akka stream

2014-12-27 Thread Konrad 'ktoso' Malawski
Hi Soumya,
I don’t think what you’ll end up measuring this way will be very useful. I 
mean, between the completion of the future and the triggering of the map there 
are multiple asynchronous boundaries… So you won’t be measuring how fast the 
set operation was, but how much time was between these asynchronous boundaries 
- which could have been backpressured by the way.

I suggest directly wrapping the set call with your measurement logic instead - 
since that is what you want to measure it seems.

By the way, we do have a “timed” element, in our extras section: 
https://github.com/akka/akka/blob/release-2.3-dev/akka-stream/src/main/scala/akka/stream/extra/Timed.scala
 You can `import Timed._` and then use it as shown here: 
https://github.com/akka/akka/blob/release-2.3-dev/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala
It’s a rather old element and I’m not sure if we’ll be keeping it, but you can 
use it as a source of inspiration in case you end up needing that kind of 
measurement.



On 26 December 2014 at 05:46:55, Soumya Simanta (soumya.sima...@gmail.com) 
wrote:


This is related to this thread but sufficiently different that I decided to 
create new thread. Hope that's okay. 

I would like to create a histogram of latency of a large number of set 
operations ( set returns a Future[Boolean]) using LatencyUtils 

 Basically I need to start recording the time before the set operation (inside 
mapAsyncUnordered(k = redis.set(k + rnd, message))) and then somehow record 
the end time in a map operation( .map( //record the end time here) after this. 
I'm having a hard time trying to figure this out. 

My understanding is that the even though the mapAsyncUnordered doesn't maintain 
the order of operations the map following the mapAsynUnordered will maintain 
the order from the previous stage because of TCP maintaining the order. Is this 
correct? 


val redis = RedisClient(localhost)

val random1 = UUID.randomUUID().toString

def insertValues(rnd: String): Flow[Int, Boolean] = {
    Flow[Int].mapAsyncUnordered(k = redis.set(k + rnd, message)).map( //record 
the end time here)
}

val blackhole = BlackholeSink

val maxSeq = 500
val seqSource = Source( () = (1 to maxSeq).iterator )
val streamWithSeqSource = 
seqSource.via(insertValues(random1)).runWith(blackhole)


--
 Read the docs: http://akka.io/docs/
 Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
 Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Recording latency of operations that return a Future inside an akka stream

2014-12-25 Thread Soumya Simanta

This is related to this 
https://groups.google.com/forum/#!topic/akka-user/NrSkEwMrS3s thread but 
sufficiently different that I decided to create new thread. Hope that's 
okay. 

I would like to create a histogram of latency of a large number of set 
operations ( set returns a Future[Boolean]) using LatencyUtils 
https://github.com/LatencyUtils/LatencyUtils 

 Basically I need to start recording the time before the set operation 
(inside mapAsyncUnordered(k = redis.set(k + rnd, message))) and then 
somehow record the end time in a map operation( .map( //record the end time 
here) after this. I'm having a hard time trying to figure this out. 

My understanding is that the even though the mapAsyncUnordered doesn't 
maintain the order of operations the map following the mapAsynUnordered 
will maintain the order from the previous stage because of TCP maintaining 
the order. Is this correct? 


val redis = RedisClient(localhost)

val random1 = UUID.randomUUID().toString

def insertValues(rnd: String): Flow[Int, Boolean] = {
Flow[Int].mapAsyncUnordered(k = redis.set(k + rnd, message)).map( 
//record the end time here)
}

val blackhole = BlackholeSink

val maxSeq = 500
val seqSource = Source( () = (1 to maxSeq).iterator )
*val streamWithSeqSource = 
seqSource.via(insertValues(random1)).runWith(blackhole)*


-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.