On Mon, May 11, 2015 at 12:22 AM, <eeco...@gmail.com> wrote:

> There seems to be no way to express this (simple) shape using a
> `FlexiRoute`. A naive version:
>
>   class SplitShape[A](init: FanOutShape.Init[A] =
> FanOutShape.Name[A]("Split"))
>     extends FanOutShape[A](init) {
>
>     val whenTrue = newOutlet[A]("whenTrue")
>     val whenFalse = newOutlet[A]("whenFalse")
>
>     protected def construct(init: FanOutShape.Init[A]) = new
> SplitShape(init)
>   }
>
>   class Split[A](predicate: A => Boolean)
>     extends FlexiRoute[A, SplitShape[A]](new SplitShape[A],
> OperationAttributes.name("PartialOutput")) {
>
>     import FlexiRoute._
>
>     def createRouteLogic(p: PortT) = new RouteLogic[A] {
>       def initialState = State(DemandFromAny(p)) { (ctx, port, element) =>
>         import p._
>
>         val result = predicate(element)
>
>         (result, port) match {
>           case (true,  `whenTrue`) => ctx.emit(whenTrue)(element)
>           case (false, `whenFalse`) => ctx.emit(whenFalse)(element)
>           case (true,  `whenFalse`) =>
>             println("lost element " + element)
>           case (false, `whenTrue`) =>
>             println("lost element " + element)
>           case _ => sys error "Unknown port"
>         }
>
>         SameState
>       }
>     }
>   }
>
> It seems (and I might be mistaken) that the demand and availability of
> elements are tied together. If I would replace the 'lost element' cases
> with placing the elements in a buffer, they would only grow as the
> 'onInput' method is only called when input is available, not when there is
> demand, regardless of input.
>

The workaround currently is to use DemandFromAll, but only emit on the
relevant output. This will not consume the demand from the other output.
You have to change the completion handling logic though, since if you
requrire demand from all output, but one cancels, then DemandFromAll
becomes unfulfillable and your fanout stage will stop.

These APIs are limited for now, but this will change in the future. Btw,
don't be confused by the 1.0-RCx versions, Akka Streams have a separate
versioning scheme from other Akka modules because it was not possible at
that point to tie its release process to the main versions. Even when
streams reach 1.0 final, it will just mean that it is considered an
experimental module, and enter the normal Akka versioning scheme soon (and
merge into 2.4).

-Endre


>  --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to