I cannot find a solution for accessing the elements of a stream more than ones inside a flow without buffering the complete stream. To illustrate the problem I created the following example:
def randomIntegersSource(size: Int): Source[Int, _] = { val iter = Iterator.continually(ThreadLocalRandom.current().nextInt(100)) Source(() => iter.take(size)) } test("normalize a stream of integers") { implicit val sys = ActorSystem("sys") try { implicit val materializer = ActorMaterializer() val src: Source[Int, _] = randomIntegersSource(size = 20) // Converts a stream of positive integers to doubles ranging from 0 to 1. // The gratest input value converts to 1 val normalizeFlow: Flow[Int, Double, _] = ??? val f = src.via(normalizeFlow).runForeach {norm => println("%.3f" format norm)} Await.result(f, 2.second) } finally { sys.shutdown() } } Inside the 'normalizeFlow' I have to process the stream twice. First to find the maximum value and second to calculate the normalized values. Does anyone know an implementation for 'normalizeFlow' that runs with arbritrary sized streams and that does not need to buffer the whole stream in order to work? Here normalize.scala <https://github.com/wwagner4/akka-streams-tryout/blob/master/src/test/scala/normalize.scala> you can find the complete example with a buffering solution. -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.