This is actually a fairly interesting problem that I've been putting a lot 
of thought into lately as well. I've been working on a (very rough) 
ground-up implementation of AMQP using the new StreamTcp API, and I've run 
into an interesting problem with RabbitMQ.

Basically, AMQP uses the concept of an Ack for a message into two different 
(and unrelated) concepts:

1) It's used to signal that a message has been handled and consumed, and 
thus may be removed from the queue that it originated in.
2) It's also used to signal back pressure to the broker from a client. When 
creating an AMQP channel, you specify a QoS level, which is the total 
number of in-flight un-acked messages that the broker will deliver to you.

On top of this, Rabbit also uses TCP to signal back pressure the other way 
around - when a client sends too many requests (publishes, etc), it signals 
TCP back pressure.

The incoming/subscription method of using Acks and QoS seems to make sense 
to me, since you don't want a slow consumer to stall all of the other 
subscribing channels on a given connection. I've been playing with the idea 
of modeling a subscription as a BidiFlow with messages being emitted on the 
top side, and having an incoming stream of Ack/Reject messages on the 
bottom side. The top side would adjust the underlying channel's QoS based 
on incoming demand, and leaving the responsibility to Ack messages entirely 
up to the user-defined downstream message-processing Flow. This is 
obviously not ideal, since it would easily allow a poorly-behaved 
downstream that never sends an Ack to cause the QoS on the channel to 
eventually blow up and be unbounded.

The best I can think of to get around this is to just remove the concept of 
Acks from the downstream consumer, and keep it entirely bounded inside of 
the subscription's Source. Namely, keep an internal buffer of messages, and 
send an Ack back to the broker when downstream signals demand and an 
element is emitted. This would work quite well, but it does eliminate some 
of the AMQP model from the downstream processor - namely, there's no way to 
explicitly reject a message as inappropriate. In the grand scheme of 
things, I don't think this is a big deal, however, because for most of the 
use cases I've seen, when a message on an AMQP queue is malformed or 
unprocurable, the last thing you really want to do is put it back on the 
same queue to be processed later.

Forgive the digression and wordiness, this is just something I've been 
thinking about quite a lot recently - my end goal is the ability to have a 
Rabbit client that supports a subset of the protocol (most likely publish 
and subscribe) written and based from the ground up on Akka StreamTcp. Am I 
completely barking the wrong tree here?


On Thursday, May 14, 2015 at 1:12:01 AM UTC-5, rkuhn wrote:
>
>
> 14 maj 2015 kl. 00:43 skrev Tim Harper <timch...@gmail.com <javascript:>>:
>
>
> On May 13, 2015, at 01:15, Roland Kuhn <goo...@rkuhn.info <javascript:>> 
> wrote:
>
> Hi Tim,
>
> this looks like a very nice package, thanks for sharing! Since I am not 
> that deeply into RabbitMQ, I guess the “opinionated” part is about favoring 
> at-least-once guarantees and explicit acknowledgement over magic? This 
> would fit in well with the overall Akka philosophy, and that would 
> presumably not be coincidental ;-)
>
>
> It's the consumer pattern, deserialization pattern and error-reporting 
> pattern, bundled up into one integrated package.
>
> How has your experience been so far with building upon Akka Streams, did 
> you miss anything or find anything especially great?
>
>
> I love Akka Streams. My only qualm currently (was going to comment on the 
> thread in which you solicited feedback on the DSL) is the 
> .toMat(sink)(Keep.right); I frequently use the Future from Sink.foreach 
> and wish it were the default.
>
>
> In that situation you can use .runWith(Sink.head) where the default is to 
> return the Future—we use that in almost all our tests ;-) The non-running 
> DSL methods all consistently default to the “left” side since that is 
> obviously correct for almost all of them (like map/filter/drop/take), while 
> the convenience runners default to their argument to provide this choice in 
> a nicer fashion for some prominent use-cases.
>
>
> Looking at your code samples there is possibly one difficult part, which 
> is the Promise that gets fed into the Sink in order to signal 
> completion—perhaps that could be modeled more cleanly by exposing an 
> outgoing stream of acknowledgements (i.e. making the producer a Flow and 
> not a Sink) so that a feedback loop would inform upstream producers of 
> successful ingestion.
>
>
> That's an interesting idea; I am using promises right now to achieve that 
> result, and the pattern works relatively well. It allows me to pass the 
> original upstream acknowledgement promise into the publisher sink, such 
> that the message isn't considered "done" until any resultant messages are 
> persisted in another message queue.
>
> I've thought about your approach and I'm not quite sure how I would do it. 
> I could pass a tuple of the original deliveryTag (from the consumer Source) 
> and the message to publish into the Producer flow, and have the publisher 
> flow yield the deliveryTag; then, stream the deliveryTag back to the 
> consumer Source so it can acknowledge it. One benefit of this is I could 
> cross process boundaries safely. However, the consumer Source actually 
> monitors the promises that it yields, and yells at the user if they are 
> garbage collection (to let them know they dropped a promise without 
> resolving it). I suppose I could do a configurable timeout, instead. Is 
> this what you had in mind?
>
>
> Yes, that is what I meant.
>
>
> I tend to prefer the promises, since they are simple and universal, but I 
> admit there are issues with cross-process boundaries.
>
> Worth of mention, I must ack the message on the same AMQP channel from 
> which it was originally delivered, so, crossing machine boundaries is 
> already going to cause pain, and a less reliable auto-acking approach 
> should be used, instead.
>
>
> I wasn’t thinking so much about crossing machine boundaries, I’m currently 
> more interested in learning how powerful the DSL is that we have built. By 
> using Promises you introduce an “uncontrolled” back-channel for 
> communicating between stream processing stages, and if that were the only 
> way to solve this then our DSL would not be exhaustive. So, for me it is 
> more of an aesthetic concern that the solution would be simpler in a way by 
> just using one abstraction instead of mixing two. A working solution still 
> is a working solution for you, of course ;-)
>
> Regards,
>
> Roland
>
>
> Regards,
>
> Tim Harper
>
>
> -- 
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com <javascript:>.
> To post to this group, send email to akka...@googlegroups.com 
> <javascript:>.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>
>
>
> *Dr. Roland Kuhn*
> *Akka Tech Lead*
> Typesafe <http://typesafe.com/> – Reactive apps on the JVM.
> twitter: @rolandkuhn
> <http://twitter.com/#!/rolandkuhn>
>  
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to