I am trying to write a flow that accumulates a collection of data (Int in 
this example) and emits a List[Int] when either 1) the collection reaches a 
certain size or 2) a timer tick is received.

I've implemented the code below.  I have a println in the process() method 
to know what's going on.
When I run this I get output once (false false true) then never 
again...like the stream is stuck.  My expectation would be that the 
dataReady flag would soon be set as data is pushed from the input source, 
but that's not happening.

What am I missing?   Why is my stream not streaming?

Thanks for any help!
Greg

case class Accumulator(coll:scala.collection.mutable.ListBuffer[Int]) 
extends GraphStage[FanInShape2[Boolean,Int,List[Int]]] {

  val out:Outlet[List[Int]] = Outlet("Stuff")

  val tick:Inlet[Boolean] = Inlet("tick")

  val data:Inlet[Int] = Inlet("data")

  val CHUNK_SIZE = 5


  private var dataReady = false

  private var tickReady = false


  override val shape:FanInShape2[Boolean,Int,List[Int]] = new FanInShape2(
tick,data,out)


  override def createLogic(inheritedAttributes: Attributes): 
GraphStageLogic = new GraphStageLogic(shape) {

    

    def process() {

      println(s"$tickReady $dataReady ${isAvailable(out)}") // prints false 
false true the one time it outputs anything

      if( coll.size < CHUNK_SIZE && dataReady ) {

        coll += grab(data)

        dataReady = false

      }

      if(isAvailable(out) && coll.size > 0 && (tickReady || coll.size == 
CHUNK_SIZE)) {

        tickReady = false

        push(out,coll.to[List])

        coll.clear

        if(dataReady) 

          process()  // in case data is ready but collection was full

      }

    }

    setHandler(out, new OutHandler{

      override def onPull():Unit = {

        process()

      }

    })

    setHandler(data, new InHandler{

      override def onPush():Unit = {

        dataReady = true

        process()

      }

    })

    setHandler(tick, new InHandler{

      override def onPush():Unit = {        

        tickReady = true

        grab(tick)  // clear tick's payload...don't care about it otherwise

        process()

      }

    })

  }

}


case class Stuff() {

  val rateLimit = Flow[Int].map(i => { Thread.sleep(100); i })  

  val collection = scala.collection.mutable.ListBuffer.empty[Int]

  val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder: 
FlowGraph.Builder[Unit] =>

    import FlowGraph.Implicits._

      

    val numSource = Source(1 to 100)

    val tickSource = Source.tick[Boolean](1 second, 1 second, true)

    val toss = Sink.ignore

    val mix = builder.add(Accumulator(collection))

    val show = Flow[List[Int]].map(i => {println(i);i})


      tickSource              ~> mix.in0

      numSource  ~> rateLimit ~> mix.in1

                                 mix.out ~> show ~> toss


    ClosedShape

  })

}

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to