I am trying to write a flow that accumulates a collection of data (Int in this example) and emits a List[Int] when either 1) the collection reaches a certain size or 2) a timer tick is received.
I've implemented the code below. I have a println in the process() method to know what's going on. When I run this I get output once (false false true) then never again...like the stream is stuck. My expectation would be that the dataReady flag would soon be set as data is pushed from the input source, but that's not happening. What am I missing? Why is my stream not streaming? Thanks for any help! Greg case class Accumulator(coll:scala.collection.mutable.ListBuffer[Int]) extends GraphStage[FanInShape2[Boolean,Int,List[Int]]] { val out:Outlet[List[Int]] = Outlet("Stuff") val tick:Inlet[Boolean] = Inlet("tick") val data:Inlet[Int] = Inlet("data") val CHUNK_SIZE = 5 private var dataReady = false private var tickReady = false override val shape:FanInShape2[Boolean,Int,List[Int]] = new FanInShape2( tick,data,out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { def process() { println(s"$tickReady $dataReady ${isAvailable(out)}") // prints false false true the one time it outputs anything if( coll.size < CHUNK_SIZE && dataReady ) { coll += grab(data) dataReady = false } if(isAvailable(out) && coll.size > 0 && (tickReady || coll.size == CHUNK_SIZE)) { tickReady = false push(out,coll.to[List]) coll.clear if(dataReady) process() // in case data is ready but collection was full } } setHandler(out, new OutHandler{ override def onPull():Unit = { process() } }) setHandler(data, new InHandler{ override def onPush():Unit = { dataReady = true process() } }) setHandler(tick, new InHandler{ override def onPush():Unit = { tickReady = true grab(tick) // clear tick's payload...don't care about it otherwise process() } }) } } case class Stuff() { val rateLimit = Flow[Int].map(i => { Thread.sleep(100); i }) val collection = scala.collection.mutable.ListBuffer.empty[Int] val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder: FlowGraph.Builder[Unit] => import FlowGraph.Implicits._ val numSource = Source(1 to 100) val tickSource = Source.tick[Boolean](1 second, 1 second, true) val toss = Sink.ignore val mix = builder.add(Accumulator(collection)) val show = Flow[List[Int]].map(i => {println(i);i}) tickSource ~> mix.in0 numSource ~> rateLimit ~> mix.in1 mix.out ~> show ~> toss ClosedShape }) } -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.