clebertsuconic commented on code in PR #1390: URL: https://github.com/apache/activemq/pull/1390#discussion_r2395920621
########## docs/proposal/jakarta_messaging_3_1_async_producer/jakarta_messaging_3_1_async_producer.md: ########## @@ -0,0 +1,227 @@ +# Jakarta Messaging 3.1: asynchronous send design doc + +Date: Jan 5th 2025 +\+1 on proposal: pending +\-1 on proposal: pending + +The purpose of this document is to discuss the design of implementing asynchronous send (with CompletionListener) that is JMS 2.0 / Jakarta Messaging 3.1 compliant. + +## **Background** + +Here is the [link](https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#asynchronous-send) to the official Jakarta Messaging 3.1 spec (Section 7.3). + +ActiveMQ as it is, already sends messages asynchronously by default (except persistent messages outside of a transaction boundary) and supports non-JMS compliance interface `AsyncCallback` for client applications to specify the callback to trigger once the server sends the acknowledgement. + +## **Implementation Requirements** + +I highlight the requirements defined in [official Jakarta Messaging 3.1 spec](https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#asynchronous-send) Please see the [appendix](#summary-of-jakarta-messaging-3.1-requirements) for more details. + +## **Overall approach** + +There was a long discussion on the original [PR](https://github.com/apache/activemq/pull/1364/commits). The consensus was to follow the spec 100%. But to capture the rationale behind, I will outline the two approaches that were initially considered. + +**Approach 1: Make CompletionListener behave the same as AsyncCallback** + +The idea is to wrap CompletionListener with a AsyncCallback. I.E every CompletionListener passed by client application is transformed into a AsyncCallback under the hood. Also users are already familiar with the async message behaviour (a message is default sent as async unless it’s a persistent message sent outside of a transacted boundary). + +Pros: + +- Simple to implement. + +Cons: + +- It doesn’t follow the spec and as a matter of fact, is very different. (All the other design decisions mentioned below are not met) + +This approach is not recommended because new users of ActiveMQ 6 will probably expect it to stick to the spec 100% and it will be hard to explain to them why we are sticking with the old implementation. + +**\[Recommended\] Approach 2: Stick to the spec 100%, and deprecate AsyncCallback in ActiveMQ 7** + +We will reference the code flow of AsyncCallback but will support all the behaviour outlined in the specs. It is up to the users which behaviour they choose. I.E: + +1. User configures always use async or doesn’t send persistent message outside of a transaction boundary: no change +2. User uses AsyncCallback as the callback for the async send: no change, it is not Jakarta Messaging Compliant +3. User uses the new API (supports jakarta.jms.CompletionListener): it will stick to Jakarta Messaging 3.1 spec outlined 100% + +So in ActiveMQ 6, there are two ways to pass callback to async send. That will cause a lot of effort to maintain both going forward and it will be confusing. Hence, at the release that launches this feature, it will also mark the AsyncCallback as deprecated. And in ActiveMQ 7 it will be removed. + +## **CompletionListener unable to close, commit, rollback session, context and producer** + +This corresponds to the behaviour outlined in [7.3.4](https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#close-commit-or-rollback). + +**\[Recommended\] Approach 1: Use ThreadLocal variable and wrapper CompletionListener** + +In this approach, it will use a ThreadLocal\<Boolean\> flag on the session, context and producer to mark if the current thread is inside a CompletionListener callback. The reason why it is ThreadLocal is because a session (it also applies to JMSContext because it wraps a session under the hood) can be shared by multiple consumers and producers. If we have the thread that executes the CompletionListener (has to be a separate thread required by [7.3.8](https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#use-of-the-completionlistener-by-the-jakarta-messaging-provider)) sets a simple shared boolean flag on the session, then the application thread will also be affected (unable to close, commit and rollback session while the CompletionListener is running), which violates the spec. + +At the same time, we are using a wrapper CompletionListener to do pre & post processing before invoking the user-provided CompletionListener and specified the wrapper as the callback for the async send. Therefore, we can set and unset the ThreadLocal flags before invoking user-provided CompletionListener. + +Here is sample code flow that illustrates the point: + +It wraps the user-provided CompletionListener (named `completionListener` in the screenshot below) into a `wrapperCompletionListener` which is later passed to the connection and the transport layer’s `asyncRequest` method. + + + +As you can see, before invoking the user-provided CompletionListener, it sets the `inCompletionListenerCallback` (an instance attribute ThreadLocal\<Boolean\> on the `Session` object) to true. + +Then in the `close` method of the session, check if the flag is set to true before closing. + + +## **** + +Hence if the user-provided CompletionListener tries to call close() on the session inside a CompletionListener, the `inCompletionListenerCallback` will be true in that transport thread and it will throw an exception. In contrast, in the application thread, since `inCompletionListenerCallback` is false, the application thread can still close the session even while the transport thread is processing the user-provided CompletionListener. + +When the user-provided CompletionListener completes, `inCompletionListenerCallback` will be set to false by the wrapper. + +The same idea applies to transacted session, producer and connection. + +## **Making message object inaccessible** + +This corresponds to the behaviour outlined in [7.3.9](https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#restrictions-on-the-use-of-the-message-object) and [7.3.6](https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#message-headers). Basically, before the async send of the message is completed, it will not be accessible (not readable and not writable) by the client application. + +We extend the idea of the wrapper CompletionListener stated above and add a flag in the ActiveMQMessage object, `messageAccessible` if the flag is true. The client application will not be able to read or modify message attributes. + + + +I.E at every setter and getter of the Message object + + +When a message is sent, currently by default, the session is making a copy (`isCopyMessageOnSend` is default true). Hence we need to make sure we set the flag on the original message reference, and we set its `messageAccessible` to false right before it is sent asynchronously in a connection method. After the send is complete, the wrapper CompletionListener will set the `messageAccessible` attribute of the message reference back to true. + +There is a small decision to make here. Because users can configure `isCopyMessageOnSend`, so in theory they can set it to false. I.E no copy of the message and marshal that message object into the wire. There are two options here: + +**1\. \[Recommended\] Approach 1: Enforce message copy if CompletionListener is specified:** + +Even though the OpenWire marshaller will ignore the field we just added, i.e on the broker server end, the `messageAccessible` attribute will always be true regardless. It is more safe if we enforce message copy, such that in the future when we change the marshaller logic, it will not “lock” the message on the broker server end by accident. + +**2\. Approach 2: Leave it as it is and honour `isCopyMessageOnSend`:** + +If `CopyMessageOnSend` is false, then we don’t make a copy and set the message to be inaccessible and let the marshaller encode it (the field will be ignored anyway since we are not changing the marshaller logic). + +## **Messages ordering** {#messages-ordering} + +This corresponds to [7.3.3](https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#message-order-2) and [7.3.8](https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#use-of-the-completionlistener-by-the-jakarta-messaging-provider). Basically it needs to meet two cases of messaging ordering requirements: + +1. The order of asynchronous sends to a particular destination from a producer has to be the same as the order of execution of its corresponding CompletionListener. +2. The order of asynchronous sends across a set of destinations from the same producer has to be the same as the order or execution of its corresponding CompletionListener. + +Requirement \#1 is already satisfied because the broker acknowledges messages in the same order as the messages were sent within the same destination. The client doesn’t need any additional logic to handle that. + +To satisfy requirement \#2, there are a few approaches. + +**\[Recommended\] Approach 1: Application thread send async message one at a time** + +This is the most straightforward option, sending async messages one at a time in the application thread. It basically makes async send behave like sync send. CompletionListener still needs to be executed on another thread (meeting the threading restriction 7.3.7), in this case the transport thread. +Pros: + +- It is very simple to implement (also makes implementing other features simpler, see the next design decision below) + +Cons: + +- It sort of defies the purpose of async send because the application thread needs to wait before the send is completed (can lead to performance issues). Even though it doesn’t violate the spec. + +This approach is recommended due to its simplicity. Will follow up with a plan to optimize such behaviour. + +**Approach 2: Application thread submits the async send to a work queue, a single background thread will dequeue a message from the queue, send it asynchronously and wait for it to finish, before dequeuing the next.** + +This is similar to approach 1, the async message is sent one at a time. The difference being instead of doing it in the application thread, the `Session` object will have a `SingleThreadExecutor` to handle sending of the messages on a background thread. The advantage of doing that is the application thread is not blocked by sending an async message, it simply enqueues the message to the work queue (ExecutorService) then proceeds. Allowing it to batch send a lot of async messages at a time without blocking. Review Comment: each send on the executor would still need a round trip to the server on this case. you are only unblocking the sender's thread., right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected] For further information, visit: https://activemq.apache.org/contact
