Hi Maxim,

As far as I see this is a variation on the built in ZipWithN, the
difference is that the arity of the inputs is only-known at run-time, but
the types of the inputs are the same (unlike most zips). AFAIK this is
easily doable as a FlexiMerge.

Here is an example that creates a conventional Zip using Fleximerge:
https://github.com/akka/akka/blob/release-2.3-dev/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala#L85

The difference in your case will be that you should use a UniformFanInShape
(that can have arbitrary many input ports, all of the same type) and the
number of states will be N:
 - wait for 1st input
 - wait for 2nd input
 ...
 - wait for Nth input

Of course this will only need a properly parametrized state that encodes
which port it is waiting on (i.e. one State subclass with an Int parameter).

Please be aware though that FlexiMerge and FlexiRoute are being phased out
for a simpler and yet more powerful construct that will be available in the
next 1.1 release.

-Endre


On Fri, Oct 2, 2015 at 2:16 AM, Maxim Matvienko <mmatv...@gmail.com> wrote:

> Having trouble implementing custom stream logic that would perform the
> following:
>
>
>
>
> val calculationFlows: List[Flow[Int, Int , Unit]] = ... constructed n 
> partials here
>
>
> val bcast = builder.add(Broadcast[Int](calculationFlows.length + 1))
> val custonMerge = builder.add(new CustonMerge[Int, List[Int]])
>
> val zip = ZipWith { (original: Int, calculations: List[Int])
>
>                                       Source ~> bcast ~>                      
>                      zip.in0
>
> calculationFlows.foreach { calculationFlow =>
>                                                 bcast ~> calculationFlow ~> 
> customMerge.input.inlet
> }
>                                                                             
> custonMerge.out ~>     zip.in1
>
>                                                                               
>                      zip.out ~> sink
>
>
>
> Having problem writing Custom Merge using FlexiMerge.
> The goal is to have all calculations completed and collected into
> List[Int] and the emitted to zip.in1
>
> Should look like this:
>
> val calculation1 = Flow[Int].map(_+1)
> val calculation2 = Flow[Int].map(_+2)
>
> val calculationFlows = calculation1 :: calculation2 :: List.empty
>
> when run with the code above and Source(1 to 5) should produce:
>
> (1 , List(2, 3))
> (2 , List(3, 4))
> (3 , List(4, 5))
> (4 , List(5, 6))
> (5 , List(6, 7))
>
>
> Seems like shouldn't be difficult... tried different FlexiMerge
> approaches, but no luck.
>
> Can someone help please ?
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com
Twitter: @akkateam

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to