Thanks both of you! I'll revisit this when FlexiMerge or a alternative 
solution comes along.

On Tuesday, 24 March 2015 09:57:49 UTC, Patrik Nordwall wrote:
>
> Hi,
>
> We had to remove that test <https://github.com/akka/akka/pull/16754> because 
> emitting from CompletionHandler is currently not supported.
> There is a ticket for making FlexiMerge and FlexiRoute more flexible: 
> https://github.com/akka/akka/issues/16753
>
> Regards,
> Patrik
>
> On Mon, Mar 23, 2015 at 4:59 PM, William Le Ferrand <warn...@gmail.com 
> <javascript:>> wrote:
>
> If that can help, here is how I dealt with merging n streams: 
> https://gist.github.com/williamleferrand/6d27d8826788174c3b4c 
>
> I haven't tested with 1.0-M4 but it works fine with 1.0-M1. It might hurt 
> your eyes but I had all kind of issues when trying to make it simpler - 
> comments are very welcome!
>
> On Mon, Mar 23, 2015 at 7:20 AM, Ajay Padala <ajay....@gmail.com 
> <javascript:>> wrote:
>
> I can find this example for an older akka-streams but not one that works 
> with Akka-streams 1.0-M4
>
> In particular, i am having trouble figuring out how to emit the buffered 
> element (reference) after both the streams have closed.
>
> I have tried to go to this state after both streams have closed, but the 
> onInput method is never called. Not sure how to frame the read condition so 
> that it gets called when downstream wants an element
>
> def produceRefAndClose = State(ReadAll(s.input1, s.input2)) {
>   (ctx, input, elem) ⇒
>     ctx.emit(ref)
>     ctx.finish()
>     SameState
> }
>
>
>
>
> Or maybe I am going about it the wrong way completely. Full Code below:
>
> object StreamMerge extends App {
>
> implicit val actorSystem = ActorSystem("LogFiles")
> implicit val mat = ActorFlowMaterializer()
>
>   class OrderedMergeShape[A](_init: Init[A] = Name("OrderedMerge")) extends 
> FanInShape[A](_init) {
>     val input1 = newInlet[A]("input1")
>     val input2 = newInlet[A]("input2")
>
>     override protected def construct(init: Init[A]): FanInShape[A] = new 
> OrderedMergeShape(init)
>   }
>
>   class IntOrderMerge
>     extends FlexiMerge[Int, OrderedMergeShape[Int]](
>       new OrderedMergeShape(), OperationAttributes.name("OrderedMerge")) {
>     import FlexiMerge._
>
>
>     override def createMergeLogic(s: OrderedMergeShape[Int]): MergeLogic[Int] 
> = new MergeLogic[Int] {
>       var ref: Int = _
>       var refValid = false
>
>
>       def other(input: InPort): Inlet[Int] = if (input eq s.input1) s.input2 
> else s.input1
>
>       def getFirstElement = State[Int](ReadAny(s.input1, s.input2)) { (ctx, 
> input, elem) ⇒
>         ref = elem
>         refValid = true
>         ctx.changeCompletionHandling(emitOtherOnClose)
>         readUntilLarger(other(input))
>       }
>
>       override def initialState: State[Int] = getFirstElement
>
>       def readUntilLarger(curInput: Inlet[Int]): State[Int] = 
> State[Int](Read(curInput)) {
>         (ctx, input, elem) ⇒
>           if (elem <= ref) {
>             ctx.emit(elem)
>             SameState
>           }
>           else {
>             ctx.emit(ref)
>             ref = elem
>             readUntilLarger(other(input))
>           }
>       }
>
>       def readRemaining(curInput: Inlet[Int]) = State[Int](Read(curInput)) {
>         (ctx, input, elem) ⇒
>           if (elem <= ref)
>             ctx.emit(elem)
>           else {
>             ctx.emit(ref)
>             ref = elem
>           }
>           SameState
>       }
>
>       def produceRefAndClose = State(ReadAll(s.input1, s.input2)) {
>         (ctx, input, elem) ⇒
>           ctx.emit(ref)
>           ctx.finish()
>           SameState
>       }
>
>       val emitLast = CompletionHandling(
>         onUpstreamFinish = { (ctx, input) ⇒
>           ctx.changeCompletionHandling(defaultCompletionHandling)
>           produceRefAndClose
>         },
>         onUpstreamFailure = { (ctx, _, cause) ⇒
>           ctx.fail(cause)
>           SameState
>         }
>       )
>
>       val emitOtherOnClose = CompletionHandling(
>         onUpstreamFinish = { (ctx, input) ⇒
>           ctx.changeCompletionHandling(emitLast)
>           readRemaining(other(input))
>         },
>         onUpstreamFailure = { (ctx, _, cause) ⇒
>           ctx.fail(cause)
>           SameState
>         }
>       )
>
>
>
>       override def initialCompletionHandling: CompletionHandling = 
> emitOtherOnClose
>     }
>
>   }
>   
>   def printIntStream(): Unit = {
>     val srcA = Source(List(1,2,2,3,7,8))
>     val srcB = Source(List(2,3,3,6,7))
>     val sink = Sink.foreach(println)
>     
>     val fGraph = FlowGraph.closed() { implicit b ⇒
>       import FlowGraph.Implicits._
>       val intOrderMerge = b.add(new IntOrderMerge)
>       srcA ~> intOrderMerge.input1
>       srcB ~> intOrderMerge.input2
>       intOrderMerge.out ~> sink
>     }
>     
>     println("Running flow")
>     fGraph.run()
>   }
>
>   printIntStream()
> }
>
>
>
>
>
> On Wednesday, 10 December 2014 09:51:11 UTC, drewhk wrote:
>
>
>
> On Tue, Dec 9, 2014 at 8:07 PM, William Le Ferrand <warn...@gmail.com> 
> wrote:
>
> Dear List,
>
> I'm trying to revisit this problem now that streams is in a stable state; 
> but I don't quite see how to use MergeLogic to perform this sort merge - is 
> it actually possible and does anyone would have hints toward a working 
> solution? 
>
>
> Yes, in the test for FlexiMerge there is a sorting merge. Docs will come 
> soon and will contain a cookbook section with recipes like this.
>
> -Endre
>  
>
>
> thanks in advance,
>
> best
>
> william
>
> On Wed, Sep 24, 2014 at 7:28 AM, √iktor Ҡlang 
>
> ...

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to