The problem here is that the Dispatcher has the ability to hold on to a message so when the rollback
process is believed to have completed the Dispatcher then rejects the final message AFTER the TxRollback
so that one message gets sent ahead of the other messages. The reject is dropped as the message has been
resent. This is specific to the Java Client causing the Java Broker to return messages out of order. This may be the reason that the RollbackOrderTest has been disabled. It is not clear currently if this will also affect the CPP broker. Further investigation is in required.
Due to the way that the AMQSession.Dispatcher is paused when a rollback operation is in progress it is possible that the Dispatcher thread is 'holding' a message for dispatch. The main loop of AMQSession.Dispatcher is shown here:
The problem is highlighted in the dispatchMessage call below (which is the result of disp.dispatch() on an UnprocessedMessage). If the Dispatcher is in the process of dispatching messages when a second thread calls rollback then the connection will be stopped and the dispatcher can remove a message from _queue and then stop in the dispatchMessage
private void dispatchMessage(UnprocessedMessage message)
{
long deliveryTag = message.getDeliveryTag();
synchronized (_lock)
{
try
{
while (connectionStopped())
{
_lock.wait();
}
}
catch (InterruptedException e)
{
}
if (!(message instanceof CloseConsumerMessage)
&& tagLE(deliveryTag, _rollbackMark.get()))
{
rejectMessage(message, true);
}
else
{
synchronized (_messageDeliveryLock)
{
notifyConsumer(message);
}
}
}
long current = _rollbackMark.get();
if (updateRollbackMark(current, deliveryTag))
{
_rollbackMark.compareAndSet(current, deliveryTag);
}
}
When the connection is resumed the deliveryTag of the current message will be 'less than or equal' to the _rollbackMark as this has been set to the highest deliveryTag received prior to rollback.
_rollbackMark.set(_highestDeliveryTag.get());
There are no guards in the code to stop the IO layer adding a new message to _queue whilst rollback is in progress. However, both 0-8 and 0-10 ensure that message flow has stopped whilst recovery is processed. The 0-8 sets ChannelFlow=false and waits for the Ok, in 0-10 the consumers are stopped and a sync performed.
Code Problem
The investigation of this problem has highlighted a two areas which need to be addressed:
- The ability to ensure the dispatcher is not holding a message.
- The ability to confirm when the dispatcher will not process any more messages.
How the Dispatcher holds a message
The _queue.take() call is guaranteed never to return null and once we have entered the take() call there is no way to stop the Dispatcher.
while (!_closed.get() && ((disp = (Dispatchable) _queue.take()) != null))
Hence we perform the stop as soon as possible after the take(), but this results in us holding on to a message.
Ideally we need to be able to stop the Dispatcher whilst it is in the take() method.
How the Dispatcher can keep processing.
The Dispatcher is currently uses the connecitonStopped() call to suspend its activities when the connection has been marked as stopped. However, we need to know that the Dispatcher has actually hit this section otherwise we need to guarantee that the _queue is empty.
synchronized (_lock)
{
try
{
while (connectionStopped())
{
_lock.wait();
}
}
catch (InterruptedException e)
{
}
Having the Dispatcher signal that it has stopped processing will allow us to know that we have hit the stopped state. However, this will mean that we have the opportunity to process one extra message AFTER the rollback command has been requested.
Proposed Solution
Currently there is a lot of synchronisation to ensure that we can safely start the rollback process in the AMQSession before asking the Dispatcher to clean up its resources. To ensure signal that we have stopped the dispatcher and so can guarantee we are no longer holding a message will require more synchronisation, which is both error prone and will and additional complexity to the client.
The proposed alternative is to modify the FlowControllingBlockingQueue so that we can delegate all rollback processing to the Dispatcher. This removes the need to stop the Dispatcher and if the Dispatcher is performing the rollback then it can be sure it is not currently processing an UnprocessedMessage.
While delegating the rollback of consumed messages to the Dispatcher it makes sense to give the Dispatcher more formal control over the receipt and dispatching of incoming messages. By extracting the Dispatcher from the Session class we can simplify the both classes. Locking will be reduced and the responsibility of message processing will be more cleanly delegated to the Dispatcher.
/**
* Dispatcher is responsible for delivering messages from the IO layer to registered consumers.
* The dispatcher provides asynchronous dispatch via Consumer MessageListeners and
* synchronous delivery by placing incoming messages in a Consumers recieveQueue.
*/
public interface DispatcherInterface<C extends BasicMessageConsumer>
{
/** Start this Dispatcher if required */
public void startDispatcherIfNecessary(boolean initiallyStopped);
/** Close this Dispatcher and release any resources held. Cannot be re-opened */
public void close();
/** Register a consumer to receive messages from this Dispatcher. */
public void registerConsumer(C consumer);
/** Unregister a consumer from this Dispatcher @return C removed consumer */
public C unregisterConsumer(C consumer);
/** Rollback the received message state in this Dispatcher */
public void rollback();
/** Process the received message from the IO Layer */
public void dispatchMessage(UnprocessedMessage message);
}
The clean interface with the Dispatcher from the Session means that we can more clearly delegate the rollback() control to the Dispatcher. The FlowControllingBlockingQueue will need to be augmented so that when an asynchronous request for rollback is made the Dispatcher can then pick up on this 'ServiceRequest'.
Steps
- Extract Dispatcher code to separate class and validate interface
- Update AMQSession to use new Dispatcher interface.
- Augment FlowControllingBlockingQueue to allow the injection of 'ServiceRequests'
Further Details
Further details of the change can be found here .
Comment Responses