Re: [akka-user] Re: Reactive applications with CQRS and Akka Persistence?
Hi Ashley, Op vrijdag 27 juni 2014 06:57:17 UTC+2 schreef Ashley Aitken: Hi Jeroen, On Friday, 27 June 2014 05:13:54 UTC+8, Jeroen Gordijn wrote: Why do you need a view that watches multiple Processors? This can already be achieved. You create a child View for every Processor you would like to watch. The child simply forwards all messages to its parent. There are two (or three) reasons I would suggest, please correct me if I am wrong: 1. What if there are a very very large number of Processors (e.g. for each Customer aggregate root) Do we keep the Views for each Processor (as there could be more than one) active all the time? If they are not active then (AFAIK) they will not be updated and when a query comes in for that View they will need to be activated and recover their state. How many aggregates do you expect? Actors are light weight and you can have ~2.5 million actors per GB of heap (source: akka.io). I would expect there to be more problems with keeping other state in memory. I would keep the actors active, until I am certain this will run into problems. A View that could be notified of changes to any Processor of a specific type could create/activate and update that Processor's View as needed so that it is ready for a subsequent query. This could include putting the view into an external store. 2. What if the Processors have a one to many association with another type of Processor (e.g. each Customer can have many Songs) One of the roles of the read side in CQRS is to enable (possibly multiple) denormalised read models. How do we maintain DenormalisedView of Processor (e.g. Customer with Songs) without having all the Views of the associated Processor also active? My first attempt would be to do exactly the same as I mentioned before. Just create actors that forward the events to the aggregator. I guess that this possibly lead to many actors. Otherwise I would create an actor per Processor and let the aggregators subscribe to that. This also gives you the possibility to enrich the message with some information which is not in the event. I am persisting an OrderSubmitted event. This event does not contain the orderId. In the view I do need the OrderId to correlate this event to the correct representation. The View knows the orderId and enriches the event by wrapping it in a Envelope with the orderId. Like: case OrderSubmitted = context.parent ! Envelope(orderId, OrderSubmitted) Otherwise every event needs to contain the AggregateId. A DenormalisedView(s) that could be notified of changes to one of its dependent Processors could create/activate that Processor's View and update the denormalised view(s) based on that. 3. When both (1.) and (2.) above occur (which I think is quite common). If you can think of a way to efficiently do these at scale I would be very interested to hear. From my understanding of CQRS it is common for the read side to be able to subscribe to notification of changes to the event store so as to efficiently handle the cases above. I agree that receiving events would be a nice optimization, but you still need to know for which Aggregate the event is. This will show in your code. To be able to scale with the current solution you could use cluster sharding. However, I don't know the implementation details and don;t know how the polling is performed. Do 1000 Views lead to 1 poll every interval? Akka Persistence's current approach (as far as I can tell and confirmed somewhat by Patrik) can't really handle the cases above efficiently or at scale because of this lack of more general notifications (i.e. it just updates active Views at regular intervals based solely on one specified Processor). The events could lead to faster eventually consistency. As events are published per aggregate (processorId) I do not see how this can be automatically merged into one stream without the need to change your software to include the aggregateId in every event. You could argue that including the id in every event makes things clearer anyway so in that case I follow you reasoning. Patrik in another thread suggested implementing the above scenarios using notifications with an external distributed pub-sub and Pawel has done just that in his fantastic DDD-Leaven-Akka examples using Apache Camel and ActiveMQ (but there are still limitations, e.g. rebuilding denormalised views). It's a shame that Akka Persistence doesn't seem (on its own) fully functional for CQRS as yet (IMHO). It is still experimental, of course, so there is great hope that something will be done to address this. I was not aware of the DDD-Leaven-Akka thanks for mentioning that! I will look into it. Apart from optimization I do not feel hindered to create a CQRS application. However, I did not build a large scale application with it yet. Cheers, Jeroen -- Read the docs:
Re: [akka-user] ̣̣[akka-stream] Looping or recusrion in Duct / Flow
Hi Benoit, I wasn't proposing ActorProucer to loop back, instead you should encapsulate the scrollId state and corresponding behacior in that ActorProducer -- i.e. you implement your original snippet not in terms of flow, but inside the ActorProducer. Then you can compose that stream with others. -Endre On Thu, Jun 26, 2014 at 9:58 PM, benq benoit.quart...@gmail.com wrote: Ok, thanks, I see. How should I feed my ActorProducer? It is a step of a Flow. Something like val myProducer = system.actorOf(...) Flow(ActorProducer[T](myProducer).map... Flow(...).foreach{ myProducer ! _ } ? Will the back pressure work between the two flows? Benoît Le mercredi 25 juin 2014 21:27:42 UTC+2, Patrik Nordwall a écrit : On Wed, Jun 25, 2014 at 7:11 PM, benq benoit@gmail.com wrote: Is there an example somewhere using an ActorProducer? Not an example, but a test: https://github.com/akka/ akka/blob/release-2.3-dev/akka-stream/src/test/scala/akka/stream/actor/ ActorProducerSpec.scala /Patrik Le mercredi 25 juin 2014 10:07:53 UTC+2, drewhk a écrit : Hi Benoit On Wed, Jun 25, 2014 at 9:49 AM, benq benoit@gmail.com wrote: Hi All, Is it possible to feed a Duct (/Flow) with the elements it produces, like a loop. Looping this way is dangerous, in general it usually leads to element count explosion, but since streams are always bounded, you will very likely deadlock instead. While there is a very thin line where loops are safe, it is just too easy to break it in unexpected ways. The use case is a Flow of requests to a NoSQL db. Some requests return huge result sets that needs to be retrieved in more than one request, unsing a scroll, which is similar to a database cursor. The noSQL server returns pack of n results and a scrollId that need to be used to retrieve the next n restuls, until the number of results returned is 0. So, I would like to do something like (pseudo code, no compiler at hand): --- case class Response(scrollId: String, hits: List[SearchHit]) val (in, out) = Duct[Response]. filter(_.hits.size 0). Due to this filter, the probability of deadlocking is huge when elements are actually dropped. mapFuture(client scrollSearch scrollId). map(r = Response(r.getScrollId(), r.getHits())). tee(in). // This fails at compilation: circular reference. Goal: request next scroll... Even if this would compile, tee waits until there are two subscribers, unfortunately looping creates a circular dependency of subscription timings that will probably deadlock in construction time. Also, there is now no clean termination condition, as nothing will complete this loop ever, so even if it would subscribe everything successfully, it will never stop the stream even if there are no more elements flowing. mapConcat(_.getHits map ) // ... while working on already available results build(materializer). --- Is there a way to implement this? No, unfortunately no. Even if certain loops just happen to work, it would very likely break if we change things like subscription timings (who subscribes when) although we do not violate the SPI contract. The proper way to implement what you need is to use the ActorProducer helper class and create a producer on your own that keeps track of the scrollId. -Endre Thanks in advance. Benoit -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/c urrent/additional/faq.html Search the archives: https://groups.google.com/grou p/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop
[akka-user] Sending a message after Future(s) completion
Hello, Beginner here, I'm experiencing some issues with a particular portion of my code : val paths = ListBuffer.empty[List[File]] futureFiles.foreach{ _.onComplete{ case Success(list: List[File]) = paths += list case Failure(ex) = log.info(Retrieval of files failed : {}, ex.getLocalizedMessage) } } sender() ! Detected(paths.flatten.toIndexedSeq) futureFiles is a List[Future[List[File]]]. So I attach a callback on each of them and construct a new list from it. My problem is because this is asynchronous code, sometimes it will make the tell before the paths list is filled up, thus sending nothing. I figured I'd try something like this : Future.sequence(futureFiles).onComplete{ case Success(lists) = sender() ! Detected(lists.flatten.toIndexedSeq) case Failure(ex) = log.info(Retrieval of files failed : {}, ex.getLocalizedMessage) } But doing so I lose some failure messages because of the sequencing, and it doesn't work anyway because the message is going straight to dead letters. So how can I achieve this ? Thanks -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] ̣̣[akka-stream] Looping or recusrion in Duct / Flow
Le vendredi 27 juin 2014 10:01:57 UTC+2, drewhk a écrit : Hi Benoit, I wasn't proposing ActorProucer to loop back, instead you should encapsulate the scrollId state and corresponding behacior in that ActorProducer -- i.e. you implement your original snippet not in terms of flow, but inside the ActorProducer. I understand that, it was clear. It seems my additional question was not :( My initial snippet is a Duct. It takes an input (the query parameters) and produces results. It was supposed to be plugged between two other Ducts. Then you can compose that stream with others. My question is how. How to I give an input to the ActorProducer (the query parameters). How do I compose it with other existing Duct (put it between two existing Ducts). From an ActorProducer, I can get a Flow, which start the computation. But not a Duct. Do I miss something obvious? Is it clearer? Benoît -Endre On Thu, Jun 26, 2014 at 9:58 PM, benq benoit@gmail.com javascript: wrote: Ok, thanks, I see. How should I feed my ActorProducer? It is a step of a Flow. Something like val myProducer = system.actorOf(...) Flow(ActorProducer[T](myProducer).map... Flow(...).foreach{ myProducer ! _ } ? Will the back pressure work between the two flows? Benoît Le mercredi 25 juin 2014 21:27:42 UTC+2, Patrik Nordwall a écrit : On Wed, Jun 25, 2014 at 7:11 PM, benq benoit@gmail.com wrote: Is there an example somewhere using an ActorProducer? Not an example, but a test: https://github.com/akka/ akka/blob/release-2.3-dev/akka-stream/src/test/scala/akka/stream/actor/ ActorProducerSpec.scala /Patrik Le mercredi 25 juin 2014 10:07:53 UTC+2, drewhk a écrit : Hi Benoit On Wed, Jun 25, 2014 at 9:49 AM, benq benoit@gmail.com wrote: Hi All, Is it possible to feed a Duct (/Flow) with the elements it produces, like a loop. Looping this way is dangerous, in general it usually leads to element count explosion, but since streams are always bounded, you will very likely deadlock instead. While there is a very thin line where loops are safe, it is just too easy to break it in unexpected ways. The use case is a Flow of requests to a NoSQL db. Some requests return huge result sets that needs to be retrieved in more than one request, unsing a scroll, which is similar to a database cursor. The noSQL server returns pack of n results and a scrollId that need to be used to retrieve the next n restuls, until the number of results returned is 0. So, I would like to do something like (pseudo code, no compiler at hand): --- case class Response(scrollId: String, hits: List[SearchHit]) val (in, out) = Duct[Response]. filter(_.hits.size 0). Due to this filter, the probability of deadlocking is huge when elements are actually dropped. mapFuture(client scrollSearch scrollId). map(r = Response(r.getScrollId(), r.getHits())). tee(in). // This fails at compilation: circular reference. Goal: request next scroll... Even if this would compile, tee waits until there are two subscribers, unfortunately looping creates a circular dependency of subscription timings that will probably deadlock in construction time. Also, there is now no clean termination condition, as nothing will complete this loop ever, so even if it would subscribe everything successfully, it will never stop the stream even if there are no more elements flowing. mapConcat(_.getHits map ) // ... while working on already available results build(materializer). --- Is there a way to implement this? No, unfortunately no. Even if certain loops just happen to work, it would very likely break if we change things like subscription timings (who subscribes when) although we do not violate the SPI contract. The proper way to implement what you need is to use the ActorProducer helper class and create a producer on your own that keeps track of the scrollId. -Endre Thanks in advance. Benoit -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/c urrent/additional/faq.html Search the archives: https://groups.google.com/grou p/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails
Re: [akka-user] Sending a message after Future(s) completion
WARNING: Did not try to compile this Future.sequence( futureFiles map { _ recoverWith { case ex = log.info(Retrieval of files failed : {}, ex.getLocalizedMessage); Nil } } ).map(lists = Detected(lists.flatten.toIndexedSeq)) pipeTo sender On Fri, Jun 27, 2014 at 11:22 AM, Jasper lme...@excilys.com wrote: Hello, Beginner here, I'm experiencing some issues with a particular portion of my code : val paths = ListBuffer.empty[List[File]] futureFiles.foreach{ _.onComplete{ case Success(list: List[File]) = paths += list case Failure(ex) = log.info(Retrieval of files failed : {}, ex.getLocalizedMessage) } } sender() ! Detected(paths.flatten.toIndexedSeq) futureFiles is a List[Future[List[File]]]. So I attach a callback on each of them and construct a new list from it. My problem is because this is asynchronous code, sometimes it will make the tell before the paths list is filled up, thus sending nothing. I figured I'd try something like this : Future.sequence(futureFiles).onComplete{ case Success(lists) = sender() ! Detected(lists.flatten.toIndexedSeq) case Failure(ex) = log.info(Retrieval of files failed : {}, ex.getLocalizedMessage) } But doing so I lose some failure messages because of the sequencing, and it doesn't work anyway because the message is going straight to dead letters. So how can I achieve this ? Thanks -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] ̣̣[akka-stream] Looping or recusrion in Duct / Flow
A Producer is not a Consumer On Fri, Jun 27, 2014 at 11:59 AM, benq benoit.quart...@gmail.com wrote: Le vendredi 27 juin 2014 10:01:57 UTC+2, drewhk a écrit : Hi Benoit, I wasn't proposing ActorProucer to loop back, instead you should encapsulate the scrollId state and corresponding behacior in that ActorProducer -- i.e. you implement your original snippet not in terms of flow, but inside the ActorProducer. I understand that, it was clear. It seems my additional question was not :( My initial snippet is a Duct. It takes an input (the query parameters) and produces results. It was supposed to be plugged between two other Ducts. Then you can compose that stream with others. My question is how. How to I give an input to the ActorProducer (the query parameters). How do I compose it with other existing Duct (put it between two existing Ducts). From an ActorProducer, I can get a Flow, which start the computation. But not a Duct. Do I miss something obvious? Is it clearer? Benoît -Endre On Thu, Jun 26, 2014 at 9:58 PM, benq benoit@gmail.com wrote: Ok, thanks, I see. How should I feed my ActorProducer? It is a step of a Flow. Something like val myProducer = system.actorOf(...) Flow(ActorProducer[T](myProducer).map... Flow(...).foreach{ myProducer ! _ } ? Will the back pressure work between the two flows? Benoît Le mercredi 25 juin 2014 21:27:42 UTC+2, Patrik Nordwall a écrit : On Wed, Jun 25, 2014 at 7:11 PM, benq benoit@gmail.com wrote: Is there an example somewhere using an ActorProducer? Not an example, but a test: https://github.com/akka/ akka/blob/release-2.3-dev/akka-stream/src/test/scala/akka/stream/actor/ ActorProducerSpec.scala /Patrik Le mercredi 25 juin 2014 10:07:53 UTC+2, drewhk a écrit : Hi Benoit On Wed, Jun 25, 2014 at 9:49 AM, benq benoit@gmail.com wrote: Hi All, Is it possible to feed a Duct (/Flow) with the elements it produces, like a loop. Looping this way is dangerous, in general it usually leads to element count explosion, but since streams are always bounded, you will very likely deadlock instead. While there is a very thin line where loops are safe, it is just too easy to break it in unexpected ways. The use case is a Flow of requests to a NoSQL db. Some requests return huge result sets that needs to be retrieved in more than one request, unsing a scroll, which is similar to a database cursor. The noSQL server returns pack of n results and a scrollId that need to be used to retrieve the next n restuls, until the number of results returned is 0. So, I would like to do something like (pseudo code, no compiler at hand): --- case class Response(scrollId: String, hits: List[SearchHit]) val (in, out) = Duct[Response]. filter(_.hits.size 0). Due to this filter, the probability of deadlocking is huge when elements are actually dropped. mapFuture(client scrollSearch scrollId). map(r = Response(r.getScrollId(), r.getHits())). tee(in). // This fails at compilation: circular reference. Goal: request next scroll... Even if this would compile, tee waits until there are two subscribers, unfortunately looping creates a circular dependency of subscription timings that will probably deadlock in construction time. Also, there is now no clean termination condition, as nothing will complete this loop ever, so even if it would subscribe everything successfully, it will never stop the stream even if there are no more elements flowing. mapConcat(_.getHits map ) // ... while working on already available results build(materializer). --- Is there a way to implement this? No, unfortunately no. Even if certain loops just happen to work, it would very likely break if we change things like subscription timings (who subscribes when) although we do not violate the SPI contract. The proper way to implement what you need is to use the ActorProducer helper class and create a producer on your own that keeps track of the scrollId. -Endre Thanks in advance. Benoit -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/c urrent/additional/faq.html Search the archives: https://groups.google.com/grou p/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/c urrent/additional/faq.html Search the archives: https://groups.google.com/grou p/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To
Re: [akka-user] Sending a message after Future(s) completion
Missed to wrap the Nil in a Future, but it works perfect, thanks ! Le vendredi 27 juin 2014 12:00:18 UTC+2, √ a écrit : WARNING: Did not try to compile this Future.sequence( futureFiles map { _ recoverWith { case ex = log.info(Retrieval of files failed : {}, ex.getLocalizedMessage); Nil } } ).map(lists = Detected(lists.flatten.toIndexedSeq)) pipeTo sender On Fri, Jun 27, 2014 at 11:22 AM, Jasper lme...@excilys.com javascript: wrote: Hello, Beginner here, I'm experiencing some issues with a particular portion of my code : val paths = ListBuffer.empty[List[File]] futureFiles.foreach{ _.onComplete{ case Success(list: List[File]) = paths += list case Failure(ex) = log.info(Retrieval of files failed : {}, ex.getLocalizedMessage) } } sender() ! Detected(paths.flatten.toIndexedSeq) futureFiles is a List[Future[List[File]]]. So I attach a callback on each of them and construct a new list from it. My problem is because this is asynchronous code, sometimes it will make the tell before the paths list is filled up, thus sending nothing. I figured I'd try something like this : Future.sequence(futureFiles).onComplete{ case Success(lists) = sender() ! Detected(lists.flatten.toIndexedSeq) case Failure(ex) = log.info(Retrieval of files failed : {}, ex.getLocalizedMessage) } But doing so I lose some failure messages because of the sequencing, and it doesn't work anyway because the message is going straight to dead letters. So how can I achieve this ? Thanks -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Sending a message after Future(s) completion
Oh, it's even better, use recover instead of recoverWith. On Fri, Jun 27, 2014 at 12:14 PM, Jasper lme...@excilys.com wrote: Missed to wrap the Nil in a Future, but it works perfect, thanks ! Le vendredi 27 juin 2014 12:00:18 UTC+2, √ a écrit : WARNING: Did not try to compile this Future.sequence( futureFiles map { _ recoverWith { case ex = log.info(Retrieval of files failed : {}, ex.getLocalizedMessage); Nil } } ).map(lists = Detected(lists.flatten.toIndexedSeq)) pipeTo sender On Fri, Jun 27, 2014 at 11:22 AM, Jasper lme...@excilys.com wrote: Hello, Beginner here, I'm experiencing some issues with a particular portion of my code : val paths = ListBuffer.empty[List[File]] futureFiles.foreach{ _.onComplete{ case Success(list: List[File]) = paths += list case Failure(ex) = log.info(Retrieval of files failed : {}, ex.getLocalizedMessage) } } sender() ! Detected(paths.flatten.toIndexedSeq) futureFiles is a List[Future[List[File]]]. So I attach a callback on each of them and construct a new list from it. My problem is because this is asynchronous code, sometimes it will make the tell before the paths list is filled up, thus sending nothing. I figured I'd try something like this : Future.sequence(futureFiles).onComplete{ case Success(lists) = sender() ! Detected(lists.flatten.toIndexedSeq) case Failure(ex) = log.info(Retrieval of files failed : {}, ex.getLocalizedMessage) } But doing so I lose some failure messages because of the sequencing, and it doesn't work anyway because the message is going straight to dead letters. So how can I achieve this ? Thanks -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Sending a message after Future(s) completion
Oh I see, it looks better indeed. Thanks. Le vendredi 27 juin 2014 12:15:38 UTC+2, √ a écrit : Oh, it's even better, use recover instead of recoverWith. On Fri, Jun 27, 2014 at 12:14 PM, Jasper lme...@excilys.com javascript: wrote: Missed to wrap the Nil in a Future, but it works perfect, thanks ! Le vendredi 27 juin 2014 12:00:18 UTC+2, √ a écrit : WARNING: Did not try to compile this Future.sequence( futureFiles map { _ recoverWith { case ex = log.info(Retrieval of files failed : {}, ex.getLocalizedMessage); Nil } } ).map(lists = Detected(lists.flatten.toIndexedSeq)) pipeTo sender On Fri, Jun 27, 2014 at 11:22 AM, Jasper lme...@excilys.com wrote: Hello, Beginner here, I'm experiencing some issues with a particular portion of my code : val paths = ListBuffer.empty[List[File]] futureFiles.foreach{ _.onComplete{ case Success(list: List[File]) = paths += list case Failure(ex) = log.info(Retrieval of files failed : {}, ex.getLocalizedMessage) } } sender() ! Detected(paths.flatten.toIndexedSeq) futureFiles is a List[Future[List[File]]]. So I attach a callback on each of them and construct a new list from it. My problem is because this is asynchronous code, sometimes it will make the tell before the paths list is filled up, thus sending nothing. I figured I'd try something like this : Future.sequence(futureFiles).onComplete{ case Success(lists) = sender() ! Detected(lists.flatten.toIndexedSeq) case Failure(ex) = log.info(Retrieval of files failed : {}, ex.getLocalizedMessage) } But doing so I lose some failure messages because of the sequencing, and it doesn't work anyway because the message is going straight to dead letters. So how can I achieve this ? Thanks -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Sending a message after Future(s) completion
Happy hAkking! On Fri, Jun 27, 2014 at 12:22 PM, Jasper lme...@excilys.com wrote: Oh I see, it looks better indeed. Thanks. Le vendredi 27 juin 2014 12:15:38 UTC+2, √ a écrit : Oh, it's even better, use recover instead of recoverWith. On Fri, Jun 27, 2014 at 12:14 PM, Jasper lme...@excilys.com wrote: Missed to wrap the Nil in a Future, but it works perfect, thanks ! Le vendredi 27 juin 2014 12:00:18 UTC+2, √ a écrit : WARNING: Did not try to compile this Future.sequence( futureFiles map { _ recoverWith { case ex = log.info(Retrieval of files failed : {}, ex.getLocalizedMessage); Nil } } ).map(lists = Detected(lists.flatten.toIndexedSeq)) pipeTo sender On Fri, Jun 27, 2014 at 11:22 AM, Jasper lme...@excilys.com wrote: Hello, Beginner here, I'm experiencing some issues with a particular portion of my code : val paths = ListBuffer.empty[List[File]] futureFiles.foreach{ _.onComplete{ case Success(list: List[File]) = paths += list case Failure(ex) = log.info(Retrieval of files failed : {}, ex.getLocalizedMessage) } } sender() ! Detected(paths.flatten.toIndexedSeq) futureFiles is a List[Future[List[File]]]. So I attach a callback on each of them and construct a new list from it. My problem is because this is asynchronous code, sometimes it will make the tell before the paths list is filled up, thus sending nothing. I figured I'd try something like this : Future.sequence(futureFiles).onComplete{ case Success(lists) = sender() ! Detected(lists.flatten.toIndexedSeq) case Failure(ex) = log.info(Retrieval of files failed : {}, ex.getLocalizedMessage) } But doing so I lose some failure messages because of the sequencing, and it doesn't work anyway because the message is going straight to dead letters. So how can I achieve this ? Thanks -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/c urrent/additional/faq.html Search the archives: https://groups.google.com/grou p/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] ̣̣[akka-stream] Looping or recusrion in Duct / Flow
On Fri, Jun 27, 2014 at 12:23 PM, Endre Varga endre.va...@typesafe.com wrote: On Fri, Jun 27, 2014 at 11:59 AM, benq benoit.quart...@gmail.com wrote: Le vendredi 27 juin 2014 10:01:57 UTC+2, drewhk a écrit : Hi Benoit, I wasn't proposing ActorProucer to loop back, instead you should encapsulate the scrollId state and corresponding behacior in that ActorProducer -- i.e. you implement your original snippet not in terms of flow, but inside the ActorProducer. I understand that, it was clear. It seems my additional question was not :( My initial snippet is a Duct. It takes an input (the query parameters) and produces results. It was supposed to be plugged between two other Ducts. Ah, I thought you only use Duct to be able to loop back. What about using a Transformer then? Well, have to correct myself, Transformer cannot work because you have an asynch DB API. But the solution is actually simple: Create an ActorProducer that enapsulate a result stream for a _given_ query parameter that handles the scrollId. Now this stream basically represents _one_ response stream to one query. You can expose this for example as (dummy code) def query(q: Query): Producer[Entry] Now if you have a Flow of queries you can do queries.map(query(_)).flatten(Concat) To have a contiguous stream. Although if no contiguity is needed, I would keep it as a stream-of-streams and avoid the concat. -Endre -Endre Then you can compose that stream with others. My question is how. How to I give an input to the ActorProducer (the query parameters). How do I compose it with other existing Duct (put it between two existing Ducts). From an ActorProducer, I can get a Flow, which start the computation. But not a Duct. Do I miss something obvious? Is it clearer? Benoît -Endre On Thu, Jun 26, 2014 at 9:58 PM, benq benoit@gmail.com wrote: Ok, thanks, I see. How should I feed my ActorProducer? It is a step of a Flow. Something like val myProducer = system.actorOf(...) Flow(ActorProducer[T](myProducer).map... Flow(...).foreach{ myProducer ! _ } ? Will the back pressure work between the two flows? Benoît Le mercredi 25 juin 2014 21:27:42 UTC+2, Patrik Nordwall a écrit : On Wed, Jun 25, 2014 at 7:11 PM, benq benoit@gmail.com wrote: Is there an example somewhere using an ActorProducer? Not an example, but a test: https://github.com/akka/ akka/blob/release-2.3-dev/akka-stream/src/test/scala/akka/ stream/actor/ActorProducerSpec.scala /Patrik Le mercredi 25 juin 2014 10:07:53 UTC+2, drewhk a écrit : Hi Benoit On Wed, Jun 25, 2014 at 9:49 AM, benq benoit@gmail.com wrote: Hi All, Is it possible to feed a Duct (/Flow) with the elements it produces, like a loop. Looping this way is dangerous, in general it usually leads to element count explosion, but since streams are always bounded, you will very likely deadlock instead. While there is a very thin line where loops are safe, it is just too easy to break it in unexpected ways. The use case is a Flow of requests to a NoSQL db. Some requests return huge result sets that needs to be retrieved in more than one request, unsing a scroll, which is similar to a database cursor. The noSQL server returns pack of n results and a scrollId that need to be used to retrieve the next n restuls, until the number of results returned is 0. So, I would like to do something like (pseudo code, no compiler at hand): --- case class Response(scrollId: String, hits: List[SearchHit]) val (in, out) = Duct[Response]. filter(_.hits.size 0). Due to this filter, the probability of deadlocking is huge when elements are actually dropped. mapFuture(client scrollSearch scrollId). map(r = Response(r.getScrollId(), r.getHits())). tee(in). // This fails at compilation: circular reference. Goal: request next scroll... Even if this would compile, tee waits until there are two subscribers, unfortunately looping creates a circular dependency of subscription timings that will probably deadlock in construction time. Also, there is now no clean termination condition, as nothing will complete this loop ever, so even if it would subscribe everything successfully, it will never stop the stream even if there are no more elements flowing. mapConcat(_.getHits map ) // ... while working on already available results build(materializer). --- Is there a way to implement this? No, unfortunately no. Even if certain loops just happen to work, it would very likely break if we change things like subscription timings (who subscribes when) although we do not violate the SPI contract. The proper way to implement what you need is to use the ActorProducer helper class and create a producer on your own that keeps track of the scrollId. -Endre Thanks in advance. Benoit -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/c
[akka-user] Sending message to Actor from Play Controller.
I have been playing with the distributed worker pattern http://goo.gl/vMGTgf and I'm running into an issue pushing work from a web request. The example project has a frontend: val mediator = DistributedPubSubExtension(context.system).mediator def receive = { case work = log.info(Frontend received: + work.toString()) implicit val timeout = Timeout(5.seconds) (mediator ? Send(/user/master/active, work, localAffinity = false)) map { case Master.Ack(_) = Ok } recover { case _ = NotOk } pipeTo sender } And a WorkProducer: override def preStart(): Unit = scheduler.scheduleOnce(5.seconds, self, Tick) def receive = { case Tick = n += 1 log.info(Produced work: {}, n) val work = Work(nextWorkId(), n) frontend ! work context.become(waitAccepted(work), discardOld = false) } This all works just fine, when I send directly to the frontend from my Play Framework controller: def multiply(num: Long) = Action { implicit request = implicit val timeout = Timeout(5.seconds) val frontend = core.Main.frontend frontend ! num Ok } The message seems to get lost. The frontend receives the message but it seems the actors down stream do not. I have modified the play config to use a ClusterActorRefProvider play { akka { extensions = [akka.contrib.pattern.ClusterReceptionistExtension] actor.provider = akka.cluster.ClusterActorRefProvider remote.netty.tcp.port=0 } } But to no avail. Does anybody have an idea what I'm doing wrong here? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: ANNOUNCE: First akka-http-core preview and updated akka-stream preview
Congratulations guys! I had the chance to look trough the code a few days ago and was really surprised with how simple and concise it is, akka-http is (as Spray is also) a very interesting piece of engineering, keep up with the awesome work! -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: ANNOUNCE: First akka-http-core preview and updated akka-stream preview
Good news, well done all :-) Is there any further news about the impending replacement for spray-routing and the future of its use of shapeless HList? -- T On Friday, 27 June 2014 02:44:03 UTC-7, Björn Antonsson wrote: Dear Akka and Spray community, we are very happy and excited to release the first preview of Akka HTTP’s core module based on an updated preview of Akka Streams. It is the fruit of the collaboration between the Spray and Akka teams and aims to bring you fully reactive HTTP streams. - Both HTTP requests and responses can incorporate data that are streamed on demand, be that from disk or computed on the fly. - The HTTP streams as well as all the other streams, are fully back pressure aware, enabling the server to throttle clients when necessary, or vice versa. The Streams module now includes an expanded DSL for both Scala and Java, and the HTTP module has all the core functionality to build HTTP servers and clients, as well as a brand new Java API for the HTTP model. The request handling DSL equivalent to spray-routing will be part of a future release. This preview is intended to allow you to give feedback on the Java and Scala APIs. It is focused solely on functionality, in particular this means that performance is not yet on par with Spray. We will focus on performance enhancements once the functionality is complete. For more information on Reactive Streams, you are welcome to read the official Typesafe announcement https://typesafe.com/blog/typesafe-announces-akka-streams. The aim for both of these projects are to be back-released against Akka 2.3.x and therefore the code lives in the dedicated release-2.3-dev https://github.com/akka/akka/tree/release-2.3-dev branch in the akka repository. It will of course also be part of Akka 2.4, but that is going to be released only towards the end of the year, see the updated roadmap https://docs.google.com/document/d/18W9-fKs55wiFNjXL9q50PYOnR7-nnsImzJqHOPPbM4E/pub for more information. So please give them a spin. We welcome any and all feedback, preferably in pull request form ;). *Artifacts* sbt: com.typesafe.akka %% akka-http-core-experimental % 0.4 com.typesafe.akka %% akka-stream-experimental % 0.4 maven scala 2.10: com.typesafe.akka akka-http-core-experimental_2.10 0.4 com.typesafe.akka akka-stream-experimental_2.10 0.4 maven scala 2.11: com.typesafe.akka akka-http-core-experimental_2.11 0.4 com.typesafe.akka akka-stream-experimental_2.11 0.4 *Documentation* http://doc.akka.io/docs/akka-stream-and-http-experimental/0.4/ http://doc.akka.io/api/akka-stream-and-http-experimental/0.4/ http://doc.akka.io/japi/akka-stream-and-http-experimental/0.4/ Regards, Björn -- *Björn Antonsson* Typesafe http://typesafe.com/ – Reactive Apps on the JVM twitter: @bantonsson http://twitter.com/#!/bantonsson -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Sending message to Actor from Play Controller.
Not to worry, have been a bit silly, the controller wasn't sending the correct message: def multiply(num: Long) = Action { implicit request = implicit val timeout = Timeout(5.seconds) val frontend = core.Main.frontend val work = Work(nextWorkId(), num) frontend ! work Ok } On Friday, 27 June 2014 13:18:16 UTC+1, Nicholas Wentworth-Shaw wrote: I have been playing with the distributed worker pattern http://goo.gl/vMGTgf and I'm running into an issue pushing work from a web request. The example project has a frontend: val mediator = DistributedPubSubExtension(context.system).mediator def receive = { case work = log.info(Frontend received: + work.toString()) implicit val timeout = Timeout(5.seconds) (mediator ? Send(/user/master/active, work, localAffinity = false)) map { case Master.Ack(_) = Ok } recover { case _ = NotOk } pipeTo sender } And a WorkProducer: override def preStart(): Unit = scheduler.scheduleOnce(5.seconds, self, Tick) def receive = { case Tick = n += 1 log.info(Produced work: {}, n) val work = Work(nextWorkId(), n) frontend ! work context.become(waitAccepted(work), discardOld = false) } This all works just fine, when I send directly to the frontend from my Play Framework controller: def multiply(num: Long) = Action { implicit request = implicit val timeout = Timeout(5.seconds) val frontend = core.Main.frontend frontend ! num Ok } The message seems to get lost. The frontend receives the message but it seems the actors down stream do not. I have modified the play config to use a ClusterActorRefProvider play { akka { extensions = [akka.contrib.pattern.ClusterReceptionistExtension] actor.provider = akka.cluster.ClusterActorRefProvider remote.netty.tcp.port=0 } } But to no avail. Does anybody have an idea what I'm doing wrong here? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Akka Cluster seed node problem
Hi, I am working with akka distributed worker template available on typesafe. I am using it to write a backend job which takes data from siebel using soap calls and inserts in mongo. This job is supposed to run once a week for few hours. Based on the cluster-usage and other documentation on AKKA website, I imported akka-cluster.jar and configured the application configuration file with SEED nodes (akka.cluster.seed-nodes). But when I start the first node (MASTER NODE) with the configuration I mentioned (seed nodes etc), I start getting errors on the server console saying failed to join the seed node which is obvious (as it is the first node and there is nothing to join). Now I start the second node with akka.cluster.seed-nodes configured with the ip-address and port of the process where master node is running. I once again get the errors on the server console. Now what I do next is - take the first join address of the master actor from the MASTER NODE and set it dynamically in the slave node in the code (construct an Address object and pass it to the actors on the slave node). THIS WORKS!!! If I take the same join address and configure it in the application configuration akka.cluster.seed-nodes, it throws me error and slave doesn't join the cluster. So I have following questions :- 1. How to configure the akka.cluster.seed-node configuration in application. I could never make it work/count in the configuration. 2. Is there any way to pre-configure the seed nodes in the configuration. As per me trying it out, it looks like the configuration is dynamic i.e. to take the join address of actor on the master node from the logs and configure the slave's seed-node configuration with that address ? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.