On Mon, May 11, 2015 at 12:22 AM, <eeco...@gmail.com> wrote: > There seems to be no way to express this (simple) shape using a > `FlexiRoute`. A naive version: > > class SplitShape[A](init: FanOutShape.Init[A] = > FanOutShape.Name[A]("Split")) > extends FanOutShape[A](init) { > > val whenTrue = newOutlet[A]("whenTrue") > val whenFalse = newOutlet[A]("whenFalse") > > protected def construct(init: FanOutShape.Init[A]) = new > SplitShape(init) > } > > class Split[A](predicate: A => Boolean) > extends FlexiRoute[A, SplitShape[A]](new SplitShape[A], > OperationAttributes.name("PartialOutput")) { > > import FlexiRoute._ > > def createRouteLogic(p: PortT) = new RouteLogic[A] { > def initialState = State(DemandFromAny(p)) { (ctx, port, element) => > import p._ > > val result = predicate(element) > > (result, port) match { > case (true, `whenTrue`) => ctx.emit(whenTrue)(element) > case (false, `whenFalse`) => ctx.emit(whenFalse)(element) > case (true, `whenFalse`) => > println("lost element " + element) > case (false, `whenTrue`) => > println("lost element " + element) > case _ => sys error "Unknown port" > } > > SameState > } > } > } > > It seems (and I might be mistaken) that the demand and availability of > elements are tied together. If I would replace the 'lost element' cases > with placing the elements in a buffer, they would only grow as the > 'onInput' method is only called when input is available, not when there is > demand, regardless of input. >
The workaround currently is to use DemandFromAll, but only emit on the relevant output. This will not consume the demand from the other output. You have to change the completion handling logic though, since if you requrire demand from all output, but one cancels, then DemandFromAll becomes unfulfillable and your fanout stage will stop. These APIs are limited for now, but this will change in the future. Btw, don't be confused by the 1.0-RCx versions, Akka Streams have a separate versioning scheme from other Akka modules because it was not possible at that point to tie its release process to the main versions. Even when streams reach 1.0 final, it will just mean that it is considered an experimental module, and enter the normal Akka versioning scheme soon (and merge into 2.4). -Endre > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to akka-user+unsubscr...@googlegroups.com. > To post to this group, send email to akka-user@googlegroups.com. > Visit this group at http://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.