Re: [akka-user] Recording latency of operations that return a Future inside an akka stream
Endre, Thank you for responding. I was trying to do something like this. However, I'm not sure if what you have mentioned above will work because the Redis set API returns a Future[Boolean]. So in the map after my mapAsyncUnordered I've no reference to the operation that finished before that unless the Redis returns a reference in the response to each call. Please let me know if this is not correct. However, if I use mapAsync the order should be maintained correct ? Do you have any idea of how much performance difference is between mapAsync and mapAsyncUnordered ? I can maintain two lists (before and after). I'm yet to try the other approach that Konard pointed. What I'm learning is - doing things asynchronously is fast and fun BUT requires a different mindset :-) Thanks. -Soumya On Thursday, January 1, 2015 3:48:54 AM UTC-5, Akka Team wrote: Hi, One pitfall in your simple map approach is that you might measure the wrong values. Remember that mapAsyncUnordered is, well, unordered and batching, so you cannot expect that the elements come out in the same order as they entered. On approach would be to record the start time of elements in a concurrent Map in a map stage in front of the redis write, then record the end time of elements in a map stage after, reading from the Map of start times. The drawback of the above approach is that on top of what you want to measure, you get overhead from: - using a concurrent map - adding two map stages The measurement pseudocode can look like this: .map{ id = putInMap(id, currentTime); id } .mapAsync(...) .map { id = currentTime - getFromMap(id) } .fold(0.0)(_ + _) .map(_ / NumberOfElements) // Now you have a Future of the time average. Of course you can also just collect the measurements in a Seq in the fold instead of averaging, and then you can do whatever analysis you want after. -Endre On Sun, Dec 28, 2014 at 2:50 AM, Soumya Simanta soumya@gmail.com javascript: wrote: Konrad, Thank you for pointing me the correct direction. I'll give it a shot. -Soumya On Saturday, December 27, 2014 7:48:30 AM UTC-5, Konrad Malawski wrote: Hi Soumya, I don’t think what you’ll end up measuring this way will be very useful. I mean, between the completion of the future and the triggering of the map there are multiple asynchronous boundaries… So you won’t be measuring how fast the set operation was, but how much time was between these asynchronous boundaries - which could have been backpressured by the way. I suggest directly wrapping the set call with your measurement logic instead - since that is what you want to measure it seems. By the way, we do have a “timed” element, in our extras section: https://github.com/akka/akka/blob/release-2.3-dev/akka- stream/src/main/scala/akka/stream/extra/Timed.scala You can `import Timed._` and then use it as shown here: https://github.com/akka/akka/ blob/release-2.3-dev/akka-stream-tests/src/test/scala/akka/stream/extra/ FlowTimedSpec.scala It’s a rather old element and I’m not sure if we’ll be keeping it, but you can use it as a source of inspiration in case you end up needing that kind of measurement. On 26 December 2014 at 05:46:55, Soumya Simanta (soumya@gmail.com) wrote: This is related to this https://groups.google.com/forum/#!topic/akka-user/NrSkEwMrS3s thread but sufficiently different that I decided to create new thread. Hope that's okay. I would like to create a histogram of latency of a large number of set operations ( set returns a Future[Boolean]) using LatencyUtils https://github.com/LatencyUtils/LatencyUtils Basically I need to start recording the time before the set operation (inside mapAsyncUnordered(k = redis.set(k + rnd, message))) and then somehow record the end time in a map operation( .map( //record the end time here) after this. I'm having a hard time trying to figure this out. My understanding is that the even though the mapAsyncUnordered doesn't maintain the order of operations the map following the mapAsynUnordered will maintain the order from the previous stage because of TCP maintaining the order. Is this correct? val redis = RedisClient(localhost) val random1 = UUID.randomUUID().toString def insertValues(rnd: String): Flow[Int, Boolean] = { Flow[Int].mapAsyncUnordered(k = redis.set(k + rnd, message)).map( //record the end time here) } val blackhole = BlackholeSink val maxSeq = 500 val seqSource = Source( () = (1 to maxSeq).iterator ) *val streamWithSeqSource = seqSource.via(insertValues(random1)).runWith(blackhole)* -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To
Re: [akka-user] Recording latency of operations that return a Future inside an akka stream
Hi, One pitfall in your simple map approach is that you might measure the wrong values. Remember that mapAsyncUnordered is, well, unordered and batching, so you cannot expect that the elements come out in the same order as they entered. On approach would be to record the start time of elements in a concurrent Map in a map stage in front of the redis write, then record the end time of elements in a map stage after, reading from the Map of start times. The drawback of the above approach is that on top of what you want to measure, you get overhead from: - using a concurrent map - adding two map stages The measurement pseudocode can look like this: .map{ id = putInMap(id, currentTime); id } .mapAsync(...) .map { id = currentTime - getFromMap(id) } .fold(0.0)(_ + _) .map(_ / NumberOfElements) // Now you have a Future of the time average. Of course you can also just collect the measurements in a Seq in the fold instead of averaging, and then you can do whatever analysis you want after. -Endre On Sun, Dec 28, 2014 at 2:50 AM, Soumya Simanta soumya.sima...@gmail.com wrote: Konrad, Thank you for pointing me the correct direction. I'll give it a shot. -Soumya On Saturday, December 27, 2014 7:48:30 AM UTC-5, Konrad Malawski wrote: Hi Soumya, I don’t think what you’ll end up measuring this way will be very useful. I mean, between the completion of the future and the triggering of the map there are multiple asynchronous boundaries… So you won’t be measuring how fast the set operation was, but how much time was between these asynchronous boundaries - which could have been backpressured by the way. I suggest directly wrapping the set call with your measurement logic instead - since that is what you want to measure it seems. By the way, we do have a “timed” element, in our extras section: https://github.com/akka/akka/blob/release-2.3-dev/akka- stream/src/main/scala/akka/stream/extra/Timed.scala You can `import Timed._` and then use it as shown here: https://github.com/akka/akka/ blob/release-2.3-dev/akka-stream-tests/src/test/scala/akka/stream/extra/ FlowTimedSpec.scala It’s a rather old element and I’m not sure if we’ll be keeping it, but you can use it as a source of inspiration in case you end up needing that kind of measurement. On 26 December 2014 at 05:46:55, Soumya Simanta (soumya@gmail.com) wrote: This is related to this https://groups.google.com/forum/#!topic/akka-user/NrSkEwMrS3s thread but sufficiently different that I decided to create new thread. Hope that's okay. I would like to create a histogram of latency of a large number of set operations ( set returns a Future[Boolean]) using LatencyUtils https://github.com/LatencyUtils/LatencyUtils Basically I need to start recording the time before the set operation (inside mapAsyncUnordered(k = redis.set(k + rnd, message))) and then somehow record the end time in a map operation( .map( //record the end time here) after this. I'm having a hard time trying to figure this out. My understanding is that the even though the mapAsyncUnordered doesn't maintain the order of operations the map following the mapAsynUnordered will maintain the order from the previous stage because of TCP maintaining the order. Is this correct? val redis = RedisClient(localhost) val random1 = UUID.randomUUID().toString def insertValues(rnd: String): Flow[Int, Boolean] = { Flow[Int].mapAsyncUnordered(k = redis.set(k + rnd, message)).map( //record the end time here) } val blackhole = BlackholeSink val maxSeq = 500 val seqSource = Source( () = (1 to maxSeq).iterator ) *val streamWithSeqSource = seqSource.via(insertValues(random1)).runWith(blackhole)* -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Akka Team Typesafe - The software stack for applications that
Re: [akka-user] Recording latency of operations that return a Future inside an akka stream
Hi, On Thu, Jan 1, 2015 at 6:21 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Endre, Thank you for responding. I was trying to do something like this. However, I'm not sure if what you have mentioned above will work because the Redis set API returns a Future[Boolean]. So in the map after my mapAsyncUnordered I've no reference to the operation that finished before that unless the Redis returns a reference in the response to each call. You can always transform that future to return a Future[Id] easily. Please let me know if this is not correct. However, if I use mapAsync the order should be maintained correct ? Yes it does. Do you have any idea of how much performance difference is between mapAsync and mapAsyncUnordered ? It depends on the latency distribution of the actual async call. The ordered one suffers more from stragglers (unexpectedly long calls) since every later result must wait, while the unordered is largely unaffected of this. If the timing distribution of the call has a low deviation then I expect not much difference between the two. I can maintain two lists (before and after). I'm yet to try the other approach that Konard pointed. What I'm learning is - doing things asynchronously is fast and fun BUT requires a different mindset :-) This recognition is the most important step to become a powerful Akka programmer :) -Endre Thanks. -Soumya On Thursday, January 1, 2015 3:48:54 AM UTC-5, Akka Team wrote: Hi, One pitfall in your simple map approach is that you might measure the wrong values. Remember that mapAsyncUnordered is, well, unordered and batching, so you cannot expect that the elements come out in the same order as they entered. On approach would be to record the start time of elements in a concurrent Map in a map stage in front of the redis write, then record the end time of elements in a map stage after, reading from the Map of start times. The drawback of the above approach is that on top of what you want to measure, you get overhead from: - using a concurrent map - adding two map stages The measurement pseudocode can look like this: .map{ id = putInMap(id, currentTime); id } .mapAsync(...) .map { id = currentTime - getFromMap(id) } .fold(0.0)(_ + _) .map(_ / NumberOfElements) // Now you have a Future of the time average. Of course you can also just collect the measurements in a Seq in the fold instead of averaging, and then you can do whatever analysis you want after. -Endre On Sun, Dec 28, 2014 at 2:50 AM, Soumya Simanta soumya@gmail.com wrote: Konrad, Thank you for pointing me the correct direction. I'll give it a shot. -Soumya On Saturday, December 27, 2014 7:48:30 AM UTC-5, Konrad Malawski wrote: Hi Soumya, I don’t think what you’ll end up measuring this way will be very useful. I mean, between the completion of the future and the triggering of the map there are multiple asynchronous boundaries… So you won’t be measuring how fast the set operation was, but how much time was between these asynchronous boundaries - which could have been backpressured by the way. I suggest directly wrapping the set call with your measurement logic instead - since that is what you want to measure it seems. By the way, we do have a “timed” element, in our extras section: https://github.com/akka/akka/blob/release-2.3-dev/akka-strea m/src/main/scala/akka/stream/extra/Timed.scala You can `import Timed._` and then use it as shown here: https://github.com/akka/akka/b lob/release-2.3-dev/akka-stream-tests/src/test/scala/akka/stream/extra/ FlowTimedSpec.scala It’s a rather old element and I’m not sure if we’ll be keeping it, but you can use it as a source of inspiration in case you end up needing that kind of measurement. On 26 December 2014 at 05:46:55, Soumya Simanta (soumya@gmail.com) wrote: This is related to this https://groups.google.com/forum/#!topic/akka-user/NrSkEwMrS3s thread but sufficiently different that I decided to create new thread. Hope that's okay. I would like to create a histogram of latency of a large number of set operations ( set returns a Future[Boolean]) using LatencyUtils https://github.com/LatencyUtils/LatencyUtils Basically I need to start recording the time before the set operation (inside mapAsyncUnordered(k = redis.set(k + rnd, message))) and then somehow record the end time in a map operation( .map( //record the end time here) after this. I'm having a hard time trying to figure this out. My understanding is that the even though the mapAsyncUnordered doesn't maintain the order of operations the map following the mapAsynUnordered will maintain the order from the previous stage because of TCP maintaining the order. Is this correct? val redis = RedisClient(localhost) val random1 = UUID.randomUUID().toString def insertValues(rnd: String): Flow[Int, Boolean] = { Flow[Int].mapAsyncUnordered(k = redis.set(k +
Re: [akka-user] Recording latency of operations that return a Future inside an akka stream
Hi Soumya, I don’t think what you’ll end up measuring this way will be very useful. I mean, between the completion of the future and the triggering of the map there are multiple asynchronous boundaries… So you won’t be measuring how fast the set operation was, but how much time was between these asynchronous boundaries - which could have been backpressured by the way. I suggest directly wrapping the set call with your measurement logic instead - since that is what you want to measure it seems. By the way, we do have a “timed” element, in our extras section: https://github.com/akka/akka/blob/release-2.3-dev/akka-stream/src/main/scala/akka/stream/extra/Timed.scala You can `import Timed._` and then use it as shown here: https://github.com/akka/akka/blob/release-2.3-dev/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala It’s a rather old element and I’m not sure if we’ll be keeping it, but you can use it as a source of inspiration in case you end up needing that kind of measurement. On 26 December 2014 at 05:46:55, Soumya Simanta (soumya.sima...@gmail.com) wrote: This is related to this thread but sufficiently different that I decided to create new thread. Hope that's okay. I would like to create a histogram of latency of a large number of set operations ( set returns a Future[Boolean]) using LatencyUtils Basically I need to start recording the time before the set operation (inside mapAsyncUnordered(k = redis.set(k + rnd, message))) and then somehow record the end time in a map operation( .map( //record the end time here) after this. I'm having a hard time trying to figure this out. My understanding is that the even though the mapAsyncUnordered doesn't maintain the order of operations the map following the mapAsynUnordered will maintain the order from the previous stage because of TCP maintaining the order. Is this correct? val redis = RedisClient(localhost) val random1 = UUID.randomUUID().toString def insertValues(rnd: String): Flow[Int, Boolean] = { Flow[Int].mapAsyncUnordered(k = redis.set(k + rnd, message)).map( //record the end time here) } val blackhole = BlackholeSink val maxSeq = 500 val seqSource = Source( () = (1 to maxSeq).iterator ) val streamWithSeqSource = seqSource.via(insertValues(random1)).runWith(blackhole) -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Recording latency of operations that return a Future inside an akka stream
This is related to this https://groups.google.com/forum/#!topic/akka-user/NrSkEwMrS3s thread but sufficiently different that I decided to create new thread. Hope that's okay. I would like to create a histogram of latency of a large number of set operations ( set returns a Future[Boolean]) using LatencyUtils https://github.com/LatencyUtils/LatencyUtils Basically I need to start recording the time before the set operation (inside mapAsyncUnordered(k = redis.set(k + rnd, message))) and then somehow record the end time in a map operation( .map( //record the end time here) after this. I'm having a hard time trying to figure this out. My understanding is that the even though the mapAsyncUnordered doesn't maintain the order of operations the map following the mapAsynUnordered will maintain the order from the previous stage because of TCP maintaining the order. Is this correct? val redis = RedisClient(localhost) val random1 = UUID.randomUUID().toString def insertValues(rnd: String): Flow[Int, Boolean] = { Flow[Int].mapAsyncUnordered(k = redis.set(k + rnd, message)).map( //record the end time here) } val blackhole = BlackholeSink val maxSeq = 500 val seqSource = Source( () = (1 to maxSeq).iterator ) *val streamWithSeqSource = seqSource.via(insertValues(random1)).runWith(blackhole)* -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.