I am modeling a data processing pipeline where there are several decision points, where the system may refer to a human being to make a decision. Since earlier decisions change the state of the system and affect later decisions, another goal of mine is to process a narrow sliding view of the data and have go not too much faster than humans (another goal: since these decisions potentially take time and it may need to sort through several items that the machine decides it can handle, I want the system to buffer user-decisions so work requests are ready for them when they ask for them, and then, naturally, back-pressure when the buffer is full).
In summary, my goal is to be able to merge streams in an unordered fashion. *Lengthy pseudo-code* follows for clarification: val source = Source(docsToHandle). mapAsync(withCurrentHeadVersion). groupByAsync(attachDecisionDecider.decide). map[Source[Either[Terminal, DocForProcessing]]] { case (AttachDecisionDecider.AlreadyAttached, flow) => flow map { doc => Right(doc) } case (AttachDecisionDecider.ObviousDuplicate, flow) => flow.mapAsync { doc => (entityState ? EntityState.AttachDoc(doc.headEntityVersion, doc.path)).mapTo[Option[Int]] map { case None => // conflict! Note: we may want to consider a max-retry signal that dumps to another table when abandoning; this can be handled by our requeue logic down the stream Left(ProcessConflict(doc.path)) case Some(version) => Right(doc.copy(headEntityVersion = Some(version))) // Note - it may actually be best to requery this later, to tighten the gap between decision and acting on the decision. } } case (AttachDecisionDecider.HumanDecisionNeeded, flow) => flow. map(referToHuman("attach", _)). buffer(20, OverflowStrategy.backpressure). // enque up to 20 unacked human requests mapAsyncUnordered(identity). // at least one human has begun the work (on abandon, future will be abandoned) mapAsync(identity) // human has completed the work }. mergeUnordered. mapConcat(handleResult). groupByAsync(pickDecisionDecider.decide). map[Source[Either[Terminal, DocForProcessing]]] { case (PickDecisionDecider.AlreadyPicked, flow) => flow map { doc => Right(doc) } case (PickDecisionDecider.AutoMerge, flow) => flow mapAsync { doc => (entityState ? EntityState.PickDoc(doc.headEntityVersion, doc.path)).mapTo[Option[Int]] map { case None => Left(ProcessConflict(doc.path)) case Some(version) => Right(doc.copy(headEntityVersion = Some(version))) } } case (PickDecisionDecider.HumanDecisionNeeded, flow) => flow. map(referToHuman("pick", _)). buffer(20, OverflowStrategy.backpressure). // enque up to 20 unacked human requests mapAsyncUnordered(identity). // at least one human has begun the work (on abandon, future will be abandoned) mapAsync(identity) // human has completed the work }. mergeUnordered. mapConcat(handleResult). foreach { doc => // everything that makes it this far is done; foreach consumer will produce a constant pull on the stream. ack(doc.path) } It looks like Merge unordered has been implemented for FlowGraphs <http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M2/?_ga=1.205107984.1839818538.1375706610#akka.stream.scaladsl.Merge>, but I don't see a GroupBy junction defined, just Broadcast. So, I'm not sure if I can access this feature. It looks like I might be able to define an ActorPublisher and ActorSubscribe to achieve my goal; IteratorPublisher.scala <https://github.com/akka/akka/blob/releasing-akka-stream-and-http-experimental-1.0-M2/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala> is fine enough documentation. So, this is my working plan; but wanted to check to see if I was missing some undocumented feature first or overlooking something that has already been done in this area. -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.