Lance,

I really appreciate your response, it's well thought out, and the code is a 
great illustration. I have one question, I'm guessing you probably already 
considered it so I'd like to hear your thoughts.

Using your approach, what's the best way to parallelize work that the 
handler does? Is the intention to launch several separate handleAndAckSinks 
to achieve parallelism, or something else?

I ask because the handler flow has to output one element per message in 
order to keep the message and the Try in lock step. That means the handler 
flow can't do something like break down the work into a bunch of smaller 
messages, flatten that into the stream, and utilize mapAsync in that 
flattened stream to do a bunch of parallel work (however, the handler could 
have many steps which provide some parallelism through pipelining). I'm 
just not familiar enough with Akka Streams to intuite the best approach for 
parallelize the handler.

Thanks,
Andrew

On Thursday, August 20, 2015 at 12:31:50 PM UTC-4, Lance Arlaus wrote:
>
> Andrew-
>
> How about using a simple branching flow with a broadcast and zip?
>
> The first branch carries the message to the end of the pipeline where the 
> acknowledger receives both the message and the result of processing as a 
> Tuple. It can then decide whether/how to acknowledge the message.
> The second branch contains the desired business logic and produces a Try 
> (or other monadic data type) as its result.
> This way, the processing logic has no need to pass along the message. In 
> fact, you could extract and deal with just the payload itself in that 
> second branch if you'd like.
> Since the zip waits for elements from both branches before emitting, 
> you'll have matched (Message, Try) tuples.
>
> Here's a gist: 
> https://gist.github.com/lancearlaus/e6e52fc8c7ca534cb026#file-akka-user-stream-ack-scala
>
> And the relevant code (the handleAndAckSink method is the key):
>
> case class Message[T](id: Long, body: T)
>
> trait Queue {
>   def acknowledge(id: Long): Unit
> }
>
>
> type Handler[T] = Flow[Message[T], Try[_], _]
> type AckSink = Sink[(Message[_], Try[_]), Future[_]]
>
> // A sink that acknowledges messages upon successful processing
> def ackSink(queue: Queue) =
>   Sink.foreach[(Message[_], Try[_])] {
>     case (msg, result) => result match {
>       case Success(_) => queue.acknowledge(msg.id)
>       case Failure(t) => {
>         // Do something on failure
>         println(t)
>       }
>     }
>   }
>
> // The flow that wraps the handler and acknowledger sink
> def handleAndAckSink[T](handler: Handler[T], ackSink: AckSink) = 
>   Sink(handler, ackSink, Broadcast[Message[T]](2), Zip[Message[T], 
> Try[_]])((_, mat, _, _) => mat) {
>     implicit b => (handler, ackSink, bcast, zip) =>
>
>     bcast            ~> zip.in0
>     bcast ~> handler ~> zip.in1
>                         zip.out ~> ackSink
>
>     (bcast.in)
>   }
>
>
> class AckSpec extends FlatSpec with AkkaStreamsImplicits with Matchers 
> with ScalaFutures {
>
>   def testSource(n: Int) = Source((0 to n)).map(n => Message(n, s"message 
> $n"))
>
>   val testQueue = new Queue {
>     def acknowledge(id: Long) = println(s"acknowledging message $id")
>   }
>  
>   val testHandler = Flow[Message[String]].map { msg => 
>     // Randomly fail
>     if (Random.nextBoolean) Failure(new Exception(s"failure processing 
> message $msg"))
>     else Success(s"success processing message $msg")
>   }
>
>   "Acknowledge" should "ack messages" in {
>
>     val future = testSource(10).runWith(handleAndAckSink(testHandler, 
> ackSink(testQueue)))
>
>     whenReady(future) { result =>
>     }
>
>   }
>
> }
>
>
>
> Regards,
> Lance
>
>
> On Thursday, August 20, 2015 at 12:20:28 AM UTC-4, Andrew Rollins wrote:
>>
>> Is there an idiomatic way handle queues with Akka Streas that need to 
>> acknowledge messages *after work is done* for a given message?
>>
>> This started from a thread on Twitter with Victor (
>> https://twitter.com/viktorklang/status/634117117978107904), but it's 
>> more appropriate to continue here. His last comment was "Sounds like you 
>> want Flow[T, Ack[T]] such that you can close the loop at the end."
>>
>> I'm going to show my interpretation of that suggestion and voice my 
>> concern with it. I'd love feedback.
>>
>> Assume we have an external queue service that provides messages through a 
>> "getMessage" API. Messages can be acknowledged by calling "ack(messageId)". 
>> After acknowledgement, a message is taken off the queue and won't be 
>> delivered again.
>>
>> I'm not exactly sure how a Flow[T, Ack[T]] helps, because how is the Ack 
>> being created from an arbitrary T? We need the original message identifier 
>> to be passed through the stream such that we can acknowledge the message, 
>> so we would need a flow along the lines of Flow[Msg, Ack]. In code, it 
>> could look something like this:
>>
>>   trait Msg { def msgId } // incoming queue message
>>   trait Ack               // ack result type
>>
>>   class FakeQueue {
>>     def receive : Msg = ???
>>     def ack(m: Msg) : Ack = ???
>>   }
>>
>>   val queue = new FakeQueue
>>
>>   val msgSource: Source[Msg] =
>>     Source.apply(() => Iterator.iterate[Msg](None)(_ => queue.receive))
>>
>>   val flow: Flow[Msg, Ack, Unit] = 
>>     Flow[(Msg, String)].
>>       map {msg => (deserialize(msg.body), msg) }.
>>       map {case (x, msg) => (doWork(x), msg)}.
>>       map {case (y, msg) => insertIntoDatabase(y); msg}.
>>       map {msg => queue.ack(Msg)}
>>
>> This seems ok and certainly works, but I have a hangup with this. All my 
>> intermediate steps need to passthrough the message to the end, but they 
>> individually don't care about the message. Those stages are coupled with 
>> some data they don't ultimately handle. I'd like to avoid that.
>>
>> In other words, in some hand-wavy sense, there is a desire to take a Flow 
>> such as Flow[In, Out] which is queue agnostic and then wrap that flow with 
>> something that will dequeue messages from a queue, push the "In" object to 
>> the interflow, and somehow pass along the outer message such that the 
>> message tied to an "In" pops out the other end with the associated "Out".
>>
>> I'm at a loss for how to do this. Perhaps I'm looking at it wrong to 
>> begin with. I'm hoping someone else can provide guidance.
>>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to