I am modeling a data processing pipeline where there are several decision 
points, where the system may refer to a human being to make a decision. 
Since earlier decisions change the state of the system and affect later 
decisions, another goal of mine is to process a narrow sliding view of the 
data and have go not too much faster than humans (another goal: since these 
decisions potentially take time and it may need to sort through several 
items that the machine decides it can handle, I want the system to buffer 
user-decisions so work requests are ready for them when they ask for them, 
and then, naturally, back-pressure when the buffer is full).

In summary, my goal is to be able to merge streams in an unordered fashion. 
*Lengthy 
pseudo-code* follows for clarification:

    val source = Source(docsToHandle).
      mapAsync(withCurrentHeadVersion).
      groupByAsync(attachDecisionDecider.decide).
      map[Source[Either[Terminal, DocForProcessing]]] {
        case (AttachDecisionDecider.AlreadyAttached, flow) =>
          flow map { doc => Right(doc) }
        case (AttachDecisionDecider.ObviousDuplicate, flow) =>
          flow.mapAsync { doc =>
            (entityState ? EntityState.AttachDoc(doc.headEntityVersion, 
doc.path)).mapTo[Option[Int]] map {
              case None =>
                // conflict! Note: we may want to consider a max-retry 
signal that dumps to another table when abandoning; this can be handled by 
our requeue logic down the stream
                Left(ProcessConflict(doc.path))
              case Some(version) =>
                Right(doc.copy(headEntityVersion = Some(version))) // Note 
- it may actually be best to requery this later, to tighten the gap between 
decision and acting on the decision.
            }
          }
        case (AttachDecisionDecider.HumanDecisionNeeded, flow) =>
          flow.
            map(referToHuman("attach", _)).
            buffer(20, OverflowStrategy.backpressure). // enque up to 20 
unacked human requests
            mapAsyncUnordered(identity). // at least one human has begun 
the work (on abandon, future will be abandoned)
            mapAsync(identity) // human has completed the work
      }.
      mergeUnordered.
      mapConcat(handleResult).
      groupByAsync(pickDecisionDecider.decide).
      map[Source[Either[Terminal, DocForProcessing]]] {
        case (PickDecisionDecider.AlreadyPicked, flow) =>
          flow map { doc => Right(doc) }
        case (PickDecisionDecider.AutoMerge, flow) =>
          flow mapAsync { doc =>
            (entityState ? EntityState.PickDoc(doc.headEntityVersion, 
doc.path)).mapTo[Option[Int]] map {
              case None =>
                Left(ProcessConflict(doc.path))
              case Some(version) =>
                Right(doc.copy(headEntityVersion = Some(version)))
            }
          }
        case (PickDecisionDecider.HumanDecisionNeeded, flow) =>
          flow.
            map(referToHuman("pick", _)).
            buffer(20, OverflowStrategy.backpressure). // enque up to 20 
unacked human requests
            mapAsyncUnordered(identity). // at least one human has begun 
the work (on abandon, future will be abandoned)
            mapAsync(identity) // human has completed the work
      }.
      mergeUnordered.
      mapConcat(handleResult).
      foreach { doc => // everything that makes it this far is done; 
foreach consumer will produce a constant pull on the stream.
        ack(doc.path)
      }



It looks like Merge unordered has been implemented for FlowGraphs 
<http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M2/?_ga=1.205107984.1839818538.1375706610#akka.stream.scaladsl.Merge>,
 
but I don't see a GroupBy junction defined, just Broadcast. So, I'm not 
sure if I can access this feature.

It looks like I might be able to define an ActorPublisher and 
ActorSubscribe to achieve my goal; IteratorPublisher.scala 
<https://github.com/akka/akka/blob/releasing-akka-stream-and-http-experimental-1.0-M2/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala>
 
is fine enough documentation. So, this is my working plan; but wanted to 
check to see if I was missing some undocumented feature first or 
overlooking something that has already been done in this area.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to