Further studying convinces me that this is not yet implemented. I'm going 
to pursue the ActorPublisher / ActorSubscriber route; thank you for this 
documentation page 
<http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-integrations.html>,
 
Akka-Stream folks! I'll post my solution when done, in the case someone 
else needs this as well.

On Tuesday, January 6, 2015 11:39:22 AM UTC-7, Tim Harper wrote:
>
> I am modeling a data processing pipeline where there are several decision 
> points, where the system may refer to a human being to make a decision. 
> Since earlier decisions change the state of the system and affect later 
> decisions, another goal of mine is to process a narrow sliding view of the 
> data and have go not too much faster than humans (another goal: since these 
> decisions potentially take time and it may need to sort through several 
> items that the machine decides it can handle, I want the system to buffer 
> user-decisions so work requests are ready for them when they ask for them, 
> and then, naturally, back-pressure when the buffer is full).
>
> In summary, my goal is to be able to merge streams in an unordered 
> fashion. *Lengthy pseudo-code* follows for clarification:
>
>     val source = Source(docsToHandle).
>       mapAsync(withCurrentHeadVersion).
>       groupByAsync(attachDecisionDecider.decide).
>       map[Source[Either[Terminal, DocForProcessing]]] {
>         case (AttachDecisionDecider.AlreadyAttached, flow) =>
>           flow map { doc => Right(doc) }
>         case (AttachDecisionDecider.ObviousDuplicate, flow) =>
>           flow.mapAsync { doc =>
>             (entityState ? EntityState.AttachDoc(doc.headEntityVersion, 
> doc.path)).mapTo[Option[Int]] map {
>               case None =>
>                 // conflict! Note: we may want to consider a max-retry 
> signal that dumps to another table when abandoning; this can be handled by 
> our requeue logic down the stream
>                 Left(ProcessConflict(doc.path))
>               case Some(version) =>
>                 Right(doc.copy(headEntityVersion = Some(version))) // Note 
> - it may actually be best to requery this later, to tighten the gap between 
> decision and acting on the decision.
>             }
>           }
>         case (AttachDecisionDecider.HumanDecisionNeeded, flow) =>
>           flow.
>             map(referToHuman("attach", _)).
>             buffer(20, OverflowStrategy.backpressure). // enque up to 20 
> unacked human requests
>             mapAsyncUnordered(identity). // at least one human has begun 
> the work (on abandon, future will be abandoned)
>             mapAsync(identity) // human has completed the work
>       }.
>       mergeUnordered.
>       mapConcat(handleResult).
>       groupByAsync(pickDecisionDecider.decide).
>       map[Source[Either[Terminal, DocForProcessing]]] {
>         case (PickDecisionDecider.AlreadyPicked, flow) =>
>           flow map { doc => Right(doc) }
>         case (PickDecisionDecider.AutoMerge, flow) =>
>           flow mapAsync { doc =>
>             (entityState ? EntityState.PickDoc(doc.headEntityVersion, 
> doc.path)).mapTo[Option[Int]] map {
>               case None =>
>                 Left(ProcessConflict(doc.path))
>               case Some(version) =>
>                 Right(doc.copy(headEntityVersion = Some(version)))
>             }
>           }
>         case (PickDecisionDecider.HumanDecisionNeeded, flow) =>
>           flow.
>             map(referToHuman("pick", _)).
>             buffer(20, OverflowStrategy.backpressure). // enque up to 20 
> unacked human requests
>             mapAsyncUnordered(identity). // at least one human has begun 
> the work (on abandon, future will be abandoned)
>             mapAsync(identity) // human has completed the work
>       }.
>       mergeUnordered.
>       mapConcat(handleResult).
>       foreach { doc => // everything that makes it this far is done; 
> foreach consumer will produce a constant pull on the stream.
>         ack(doc.path)
>       }
>
>
>
> It looks like Merge unordered has been implemented for FlowGraphs 
> <http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M2/?_ga=1.205107984.1839818538.1375706610#akka.stream.scaladsl.Merge>,
>  
> but I don't see a GroupBy junction defined, just Broadcast. So, I'm not 
> sure if I can access this feature.
>
> It looks like I might be able to define an ActorPublisher and 
> ActorSubscribe to achieve my goal; IteratorPublisher.scala 
> <https://github.com/akka/akka/blob/releasing-akka-stream-and-http-experimental-1.0-M2/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala>
>  
> is fine enough documentation. So, this is my working plan; but wanted to 
> check to see if I was missing some undocumented feature first or 
> overlooking something that has already been done in this area.
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to