Slow Consumer DisconnectPage edited by Martin RitchieAdded alternative v2 change for broker using an event queueSlow Consumer Problem StatementThe problem with slow consumers is that the broker must act as a buffer until they can catch up. However the broker does not have infinite resources so it will fail if the consumer does not catch up. Consumers in the Java BrokerThe types of queues where a slow consumer can occur boils down to two properties: durability and bound exchange. Queues that are bound to the amq.direct exchange, i.e. JMS Queues, are not going to be included in this work. This reduces the queues to consider to just queues bound to the topic exchange. TopicsIn AMQP consumption is always from an AMQP Queue to avoid confusion with JMS Queues in the following discussion the term topic is defined to mean an AMQP Queue bound to the amq.topic exchange. When a topic reaches a set threshold for message count, size or age the attached consumer session we have three options.
Work has already been done to flow producers on queues: Producer flow control. This leaves us with two options. Disconnect the Slow ConsumerFor non-durable topics this means that it will be deleted so potentially freeing up the memory used by the messages. Remember the messages are shared across all topics so the memory will only be freed up when all the topics no longer require the message. For durable topics (JMS Durable Subscriptions) disconnecting the consumer will leave the queue bound and receiving messages. This will only make the memory situation worse as we now have a queue with no consumer rather than just a slow consumer. If all the messages on the topic are persistent then they can be evicted from memory if required but there is no guarantee that all the messages will have been sent persistently. On disconnection the consumer would receive an AMQP error, 506 Resource Limit Exceeded/Resource Error. For non-durable consumers this will always work. However, for a durable subscription it is possible that the consumer has disconnected when the limit is reached. So whilst deleting their subscription would be in line with the configuration it would not be expected by the user. The configuration for enabling slow consumer disconnection should allow for durable subscriptions to be maintained, targeting only transient subscriptions for disconnection. Discard messagesThe C++ broker implements a type of queue called a Ring Queue that will delete the oldest data to make room for new messages. This approach could be applied to both durable and non-durable topics and would bound the memory used. No notification to the client would be possible however as messages were deleted logging should be performed so that broker administrators can tune the topic size or inform the client that they need to consume faster. Design SpecificationThis work is mainly focused on the broker however the the client may also require changes to ensure that the error is correctly reported. Broker ChangesExtension Point v2: Extending the broker to provide an event processor for major events that occur such as MessageEnqueue/Dequeue will allow us to delegate the processing of the events. This is benefitial for two reasons one it allows delegation to a non-message delivery thread and two, it will allow multiple listeners to be registed so many components can respond to the event. This will allow other listeners such as alerting, producer-side flow-control or QMF Agents to receive the event in addition to the Slow Consumer Detection. Queue Detection v2: The use of an event based processor means there is no need for direct queue detection checks. Rather what we will need to do is to process the configuration and register an event listeners with the appropriate values. Processing Error Code Client ChangesError Processing After Effects ConfigurationPicking up on the Topic Configuration Design the addition of slow consumer configuration would be done using a 'consumer' element. The topic currently exposes three properties that we can use to control the client, depth, oldest message, and count. The configuration will provide the option to one or all of these values to apply to the specified topic. In the situation where more than one value is specified they will all be used to trigger the policy. e.g. setting count to 10 and depth to 1024 would allow the 10 messages to exist as long as their total size was not more than 1024. One additional property that would be of use here would be the consumption rate. If the topic reported the consumption rate this property could be used to define a threshold that the consumer must stay above. Consumer Element for Topic configuration <consumer> <!-- The maximum depth before which the policy will be applied--> <maximumDepth>4235264</maximumDepth> <!-- The maximum message age before which the policy will be applied--> <maximumMessageAge>600000</maximumMessageAge> <!-- The maximum number of message before which the policy will be applied--> <maximumMessageCount>50</maximumMessageCount> <!-- Available Policies : Delete | Cycle --> <policy name="Delete"> <options> <option name="include-persistent" value="true"/> </options> </policy> </consumer> Testing SpecTesting for this new feature will mainly rely on system testing. Unit TestingConfiguration changes need to be validated as part of existing Configuration Testing. System TestingSystem testing requries a number of dimensions to be varied.
Protocol VersionTesting should be completed at a minimum on the two sets of protocol 0-8/0-9/0-91 and 0-10. Ideally the test would be run on each protocol version to verify the protocol exception is correctly propogated. Client Ack ModeThe tests should be run against each client ack mode to validate if there is any difference in the exception handling. Transacted for instance should fail to commit by throwing the expected exception as well as having the exception appear on the ExceptionListener. The NoAck case has addition issue in that it can overwhelm IO layer in presence of a slow consumer. This should be verified however its resolution is beyond the scope of this work. Client Consume ModeThe client can consume in one of two ways. Synchronously using receive() or asynchronously using a MessageListener. Topic TypeTopics can be created as durable or non-durable(transient) both of these configurations should be tested as any exception should be reported in the same way. Additionally as the configuration has the ability to selectively delete durable topics this must also be tested. The required exception should be thrown when enabled but not thrown when the configuration does not control durable topics.
Change Notification Preferences
View Online
|
View Change
|
Add Comment
|
- [CONF] Apache Qpid > Slow Consumer Disconnect confluence
- [CONF] Apache Qpid > Slow Consumer Disconnect confluence
- [CONF] Apache Qpid > Slow Consumer Disconnect confluence
- [CONF] Apache Qpid > Slow Consumer Disconnect confluence