Andrew-

How about using a simple branching flow with a broadcast and zip?

The first branch carries the message to the end of the pipeline where the 
acknowledger receives both the message and the result of processing as a 
Tuple. It can then decide whether/how to acknowledge the message.
The second branch contains the desired business logic and produces a Try 
(or other monadic data type) as its result.
This way, the processing logic has no need to pass along the message. In 
fact, you could extract and deal with just the payload itself in that 
second branch if you'd like.
Since the zip waits for elements from both branches before emitting, you'll 
have matched (Message, Try) tuples.

Here's a gist: 
https://gist.github.com/lancearlaus/e6e52fc8c7ca534cb026#file-akka-user-stream-ack-scala

And the relevant code (the handleAndAckSink method is the key):

case class Message[T](id: Long, body: T)

trait Queue {
  def acknowledge(id: Long): Unit
}


type Handler[T] = Flow[Message[T], Try[_], _]
type AckSink = Sink[(Message[_], Try[_]), Future[_]]

// A sink that acknowledges messages upon successful processing
def ackSink(queue: Queue) =
  Sink.foreach[(Message[_], Try[_])] {
    case (msg, result) => result match {
      case Success(_) => queue.acknowledge(msg.id)
      case Failure(t) => {
        // Do something on failure
        println(t)
      }
    }
  }

// The flow that wraps the handler and acknowledger sink
def handleAndAckSink[T](handler: Handler[T], ackSink: AckSink) = 
  Sink(handler, ackSink, Broadcast[Message[T]](2), Zip[Message[T], 
Try[_]])((_, mat, _, _) => mat) {
    implicit b => (handler, ackSink, bcast, zip) =>

    bcast            ~> zip.in0
    bcast ~> handler ~> zip.in1
                        zip.out ~> ackSink

    (bcast.in)
  }


class AckSpec extends FlatSpec with AkkaStreamsImplicits with Matchers with 
ScalaFutures {

  def testSource(n: Int) = Source((0 to n)).map(n => Message(n, s"message 
$n"))

  val testQueue = new Queue {
    def acknowledge(id: Long) = println(s"acknowledging message $id")
  }
 
  val testHandler = Flow[Message[String]].map { msg => 
    // Randomly fail
    if (Random.nextBoolean) Failure(new Exception(s"failure processing 
message $msg"))
    else Success(s"success processing message $msg")
  }

  "Acknowledge" should "ack messages" in {

    val future = testSource(10).runWith(handleAndAckSink(testHandler, 
ackSink(testQueue)))

    whenReady(future) { result =>
    }

  }

}



Regards,
Lance


On Thursday, August 20, 2015 at 12:20:28 AM UTC-4, Andrew Rollins wrote:
>
> Is there an idiomatic way handle queues with Akka Streas that need to 
> acknowledge messages *after work is done* for a given message?
>
> This started from a thread on Twitter with Victor (
> https://twitter.com/viktorklang/status/634117117978107904), but it's more 
> appropriate to continue here. His last comment was "Sounds like you want 
> Flow[T, Ack[T]] such that you can close the loop at the end."
>
> I'm going to show my interpretation of that suggestion and voice my 
> concern with it. I'd love feedback.
>
> Assume we have an external queue service that provides messages through a 
> "getMessage" API. Messages can be acknowledged by calling "ack(messageId)". 
> After acknowledgement, a message is taken off the queue and won't be 
> delivered again.
>
> I'm not exactly sure how a Flow[T, Ack[T]] helps, because how is the Ack 
> being created from an arbitrary T? We need the original message identifier 
> to be passed through the stream such that we can acknowledge the message, 
> so we would need a flow along the lines of Flow[Msg, Ack]. In code, it 
> could look something like this:
>
>   trait Msg { def msgId } // incoming queue message
>   trait Ack               // ack result type
>
>   class FakeQueue {
>     def receive : Msg = ???
>     def ack(m: Msg) : Ack = ???
>   }
>
>   val queue = new FakeQueue
>
>   val msgSource: Source[Msg] =
>     Source.apply(() => Iterator.iterate[Msg](None)(_ => queue.receive))
>
>   val flow: Flow[Msg, Ack, Unit] = 
>     Flow[(Msg, String)].
>       map {msg => (deserialize(msg.body), msg) }.
>       map {case (x, msg) => (doWork(x), msg)}.
>       map {case (y, msg) => insertIntoDatabase(y); msg}.
>       map {msg => queue.ack(Msg)}
>
> This seems ok and certainly works, but I have a hangup with this. All my 
> intermediate steps need to passthrough the message to the end, but they 
> individually don't care about the message. Those stages are coupled with 
> some data they don't ultimately handle. I'd like to avoid that.
>
> In other words, in some hand-wavy sense, there is a desire to take a Flow 
> such as Flow[In, Out] which is queue agnostic and then wrap that flow with 
> something that will dequeue messages from a queue, push the "In" object to 
> the interflow, and somehow pass along the outer message such that the 
> message tied to an "In" pops out the other end with the associated "Out".
>
> I'm at a loss for how to do this. Perhaps I'm looking at it wrong to begin 
> with. I'm hoping someone else can provide guidance.
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to