I'm using the new Akka Stream library and I'm trying to create a custom merge junction, similar to the Zip junction but one that can take any number of inputs. However, I can't figure out how to do this. I've followed the documentation online and I think I'm close but keep getting this error:
[error] Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The input port [FanIn.] is not part of the underlying graph. Here is a copy of my code for the custom junction: class MergePorts(_init: Init[Frame] = Name("Merge")) extends FanInShape[ Frame](_init) { val inputs = ListBuffer[Inlet[Frame]]() def input = { val port = newInlet[Frame]("") inputs :+ port port } protected override def construct(i: Init[Frame]) = new MergePorts(i) } class MergeFrames extends FlexiMerge[Frame, MergePorts](new MergePorts, OperationAttributes.name("MergeFrames")) { import FlexiMerge._ override def createMergeLogic(port: PortT) = new MergeLogic[Frame] { override def initialState: State[_] = State(ReadAll(port.inputs:_*)) { (ctx, _, inputs) => val frames = port.inputs.map( in => inputs.get(in) ).flatten // Do some merging... // Emit result of the merge SameState } } } I think the problem is to do with the `input` function returning a new `Inlet` every call. -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.