Author: chirino
Date: Wed May 23 02:25:29 2012
New Revision: 1341729
URL: http://svn.apache.org/viewvc?rev=1341729&view=rev
Log:
Fixes APLO-206: Load balance of job queues when 'credit:1,0' is used on the
consumer.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1341729&r1=1341728&r2=1341729&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Wed May 23 02:25:29 2012
@@ -529,7 +529,7 @@ class Queue(val router: LocalRouter, val
consumer_swapped_in.size_max += amount
}
- object messages extends Sink[Delivery] {
+ object messages extends Sink[(Session[Delivery], Delivery)] {
var refiller: Task = null
@@ -550,11 +550,12 @@ class Queue(val router: LocalRouter, val
false
}
- def offer(delivery: Delivery): Boolean = {
+ def offer(event: (Session[Delivery], Delivery)): Boolean = {
if (full) {
false
} else {
-
+ val (session, delivery) = event
+ session_manager.delivered(session, delivery.size)
// We may need to drop this enqueue or head entries due
// to the drop policy.
var drop = false
@@ -1067,7 +1068,7 @@ class Queue(val router: LocalRouter, val
override def consumer = Queue.this
val session_max = producer.send_buffer_size
- val downstream = session_manager.open(producer.dispatch_queue, session_max)
+ val downstream = session_manager.open(producer.dispatch_queue,
Integer.MAX_VALUE, session_max)
dispatch_queue {
inbound_sessions += this
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1341729&r1=1341728&r2=1341729&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
Wed May 23 02:25:29 2012
@@ -266,8 +266,8 @@ class SinkMux[T](val downstream:Sink[T])
class CreditWindowFilter[T](val downstream:Sink[T], val sizer:Sizer[T])
extends SinkMapper[T,T] {
- var byte_credits = 0
var delivery_credits = 0
+ var byte_credits = 0
var disabled = true
override def full: Boolean = downstream.full || ( disabled && byte_credits
<= 0 && delivery_credits <= 0 )
@@ -278,12 +278,12 @@ class CreditWindowFilter[T](val downstre
}
def passing(value: T) = {
- byte_credits -= sizer.size(value)
delivery_credits -= 1
+ byte_credits -= sizer.size(value)
value
}
- def credit(byte_credits:Int, delivery_credits:Int) = {
+ def credit(delivery_credits:Int, byte_credits:Int) = {
this.byte_credits += byte_credits
this.delivery_credits += delivery_credits
if( !full ) {
@@ -322,10 +322,6 @@ trait SessionSinkFilter[T] extends Sessi
def remaining_capacity = downstream.remaining_capacity
}
-object SessionSinkMux {
- val default_session_max_credits =
System.getProperty("apollo.default_session_max_credits", ""+(1024*32)).toInt
-}
-
/**
* <p>
* A SinkMux multiplexes access to a target sink so that multiple
@@ -337,18 +333,14 @@ object SessionSinkMux {
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class SessionSinkMux[T](val downstream:Sink[T], val
consumer_queue:DispatchQueue, val sizer:Sizer[T]) {
+class SessionSinkMux[T](val downstream:Sink[(Session[T], T)], val
consumer_queue:DispatchQueue, val sizer:Sizer[T]) {
var sessions = HashSet[Session[T]]()
+ val overflow = new OverflowSink[(Session[T],T)](downstream)
- val overflow = new OverflowSink[(Session[T],T)](downstream.map(_._2)) {
- // Once a value leaves the overflow, then we can credit the
- // session so that more messages can be accepted.
- override protected def onDelivered(event:(Session[T],T)) = {
- val session = event._1
- val value = event._2
- session.credit_adder.merge(sizer.size(value));
- }
+ def delivered(session:Session[Delivery], size:Int) = {
+ consumer_queue.assertExecuting()
+ session.credit_adder.merge((1, size));
}
// use a event aggregating source to coalesce multiple events from the same
thread.
@@ -365,10 +357,10 @@ class SessionSinkMux[T](val downstream:S
}
}
- def open(producer_queue:DispatchQueue,
credits:Int=SessionSinkMux.default_session_max_credits):SessionSink[T] = {
- val session = new Session[T](producer_queue, 0, this)
+ def open(producer_queue:DispatchQueue, delivery_credits:Int,
size_credits:Int):SessionSink[T] = {
+ val session = new Session[T](this, producer_queue)
consumer_queue <<| ^{
- session.credit_adder.merge(credits);
+ session.credit_adder.merge((delivery_credits, size_credits));
sessions += session
}
session
@@ -392,12 +384,14 @@ class SessionSinkMux[T](val downstream:S
/**
* tracks one producer to consumer session / credit window.
*/
-class Session[T](val producer_queue:DispatchQueue, var credits:Int,
mux:SessionSinkMux[T]) extends SessionSink[T] {
+class Session[T](mux:SessionSinkMux[T], val producer_queue:DispatchQueue)
extends SessionSink[T] {
var refiller:Task = NOOP
private def sizer = mux.sizer
private def downstream = mux.source
+ var delivery_credits = 0
+ var size_credits = 0
@volatile
var enqueue_item_counter = 0L
@@ -407,19 +401,31 @@ class Session[T](val producer_queue:Disp
var enqueue_ts = mux.time_stamp
// create a source to coalesce credit events back to the producer side...
- val credit_adder = createSource(EventAggregators.INTEGER_ADD ,
producer_queue)
+ val credit_adder = createSource(new EventAggregator[(Int, Int), (Int, Int)] {
+ def mergeEvent(previous:(Int, Int), event:(Int, Int)) = {
+ if( previous == null ) {
+ event
+ } else {
+ mergeEvents(previous, event)
+ }
+ }
+ def mergeEvents(previous:(Int, Int), event:(Int, Int)) =
(previous._1+event._1, previous._2+event._2)
+ }, producer_queue)
+
credit_adder.onEvent{
- add_credits(credit_adder.getData.intValue)
+ val (count, size) = credit_adder.getData
+ add_credits(count, size)
+ if( (size > 0 || count>0) && !_full ) {
+ refiller.run
+ }
}
credit_adder.resume
private var rejection_handler: (T)=>Unit = _
- private def add_credits(value:Int) = {
- credits += value;
- if( value > 0 && !_full ) {
- refiller.run
- }
+ private def add_credits(count:Int, size:Int) = {
+ delivery_credits += count
+ size_credits += size
}
///////////////////////////////////////////////////
@@ -427,14 +433,14 @@ class Session[T](val producer_queue:Disp
// producer serial dispatch queue
///////////////////////////////////////////////////
- def remaining_capacity = credits
+ def remaining_capacity = size_credits
override def full = {
producer_queue.assertExecuting()
_full
}
- def _full = credits <= 0 && rejection_handler == null
+ def _full = ( size_credits <= 0 || delivery_credits<=0 ) &&
rejection_handler == null
override def offer(value: T) = {
producer_queue.assertExecuting()
@@ -450,7 +456,7 @@ class Session[T](val producer_queue:Disp
enqueue_size_counter += size
enqueue_ts = mux.time_stamp
- add_credits(-size)
+ add_credits(-1, -size)
downstream.merge((this, value))
}
true
Modified:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1341729&r1=1341728&r2=1341729&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Wed May 23 02:25:29 2012
@@ -62,6 +62,10 @@ object OpenwireProtocolHandler extends L
DEFAULT_WIREFORMAT_SETTINGS.setMaxFrameSize(OpenWireFormat.DEFAULT_MAX_FRAME_SIZE);
val WAITING_ON_CLIENT_REQUEST = ()=> "client request"
+
+ object SessionDeliverySizer extends Sizer[(Session[Delivery], Delivery)] {
+ def size(value: (Session[Delivery], Delivery)) = Delivery.size(value._2)
+ }
}
/**
@@ -818,7 +822,9 @@ class OpenwireProtocolHandler extends Pr
var addresses:Array[_ <: BindAddress] = _
val consumer_sink = sink_manager.open()
- val credit_window_filter = new
CreditWindowFilter[Delivery](consumer_sink.map { delivery =>
+ val credit_window_filter = new CreditWindowFilter[(Session[Delivery],
Delivery)](consumer_sink.map { event =>
+ val (session, delivery) = event
+ session_manager.delivered(session, delivery.size)
val dispatch = new MessageDispatch
dispatch.setConsumerId(info.getConsumerId)
if( delivery.message eq EndOfBrowseMessage ) {
@@ -832,11 +838,11 @@ class OpenwireProtocolHandler extends Pr
}
messages_sent += 1
dispatch
- }, Delivery)
+ }, SessionDeliverySizer)
- credit_window_filter.credit(0, info.getPrefetchSize)
+ credit_window_filter.credit(info.getPrefetchSize, 0)
- val session_manager = new SessionSinkMux[Delivery](credit_window_filter,
dispatchQueue, Delivery) {
+ val session_manager:SessionSinkMux[Delivery] = new
SessionSinkMux[Delivery](credit_window_filter, dispatchQueue, Delivery) {
override def time_stamp = broker.now
}
@@ -943,7 +949,7 @@ class OpenwireProtocolHandler extends Pr
producer.dispatch_queue.assertExecuting()
retain
- val downstream = session_manager.open(producer.dispatch_queue,
buffer_size)
+ val downstream = session_manager.open(producer.dispatch_queue,
info.getCurrentPrefetchSize.max(1), buffer_size)
var closed = false
def consumer = ConsumerContext.this
@@ -1018,7 +1024,7 @@ class OpenwireProtocolHandler extends Pr
val ack_source = createSource(EventAggregators.INTEGER_ADD, dispatch_queue)
ack_source.setEventHandler(^ {
val data = ack_source.getData
- credit_window_filter.credit(0, data)
+ credit_window_filter.credit(data, 0)
});
ack_source.resume
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1341729&r1=1341728&r2=1341729&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Wed May 23 02:25:29 2012
@@ -69,6 +69,11 @@ object StompProtocolHandler extends Log
var inbound_heartbeat = DEFAULT_INBOUND_HEARTBEAT
val WAITING_ON_CLIENT_REQUEST = ()=> "client request"
+
+ object SessionDeliverySizer extends Sizer[(Session[Delivery], Delivery)] {
+ def size(value: (Session[Delivery], Delivery)) = Delivery.size(value._2)
+ }
+
}
/**
@@ -129,6 +134,8 @@ class StompProtocolHandler extends Proto
message.asInstanceOf[StompFrameMessage].id
}
+ case class InitialCreditWindow(count:Int,size:Int,auto_credit:Boolean)
+
class StompConsumer (
val subscription_id:Option[AsciiBuffer],
@@ -137,7 +144,7 @@ class StompProtocolHandler extends Proto
val selector:(String, BooleanExpression),
override val browser:Boolean,
override val exclusive:Boolean,
- val initial_credit_window:(Int,Int, Boolean),
+ val initial_credit_window:InitialCreditWindow,
val include_seq:Option[AsciiBuffer],
val from_seq:Long,
override val close_on_drain:Boolean
@@ -194,7 +201,7 @@ class StompProtocolHandler extends Proto
credit_window_source.resume
trait AckHandler {
- def track(delivery:Delivery):Unit
+ def track(event:(Session[Delivery], Delivery)):Unit
def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit
def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer,
uow:StoreUOW=null):Unit
def close:Unit
@@ -205,7 +212,9 @@ class StompProtocolHandler extends Proto
def close = { closed = true}
- def track(delivery:Delivery) = {
+ def track(event:(Session[Delivery], Delivery)) = {
+ val (session, delivery) = event
+ session_manager.delivered(session, delivery.size)
if( closed ) {
if( delivery.ack!=null ) {
delivery.ack(Undelivered, null)
@@ -215,7 +224,7 @@ class StompProtocolHandler extends Proto
delivery.ack(Consumed, null)
}
if( !dead ) {
- credit_window_source.merge((delivery.size, 1))
+ credit_window_source.merge((1, delivery.size))
}
}
}
@@ -229,7 +238,7 @@ class StompProtocolHandler extends Proto
}
- class TrackedAck(var credit:Option[Int], val ack:(DeliveryResult,
StoreUOW)=>Unit)
+ class TrackedAck(var credit:Option[(Session[Delivery], Int)], val
ack:(DeliveryResult, StoreUOW)=>Unit)
class SessionAckHandler extends AckHandler{
var consumer_acks = ListBuffer[(AsciiBuffer, TrackedAck)]()
@@ -244,8 +253,9 @@ class StompProtocolHandler extends Proto
consumer_acks = null
}
- def track(delivery:Delivery) = {
+ def track(event:(Session[Delivery], Delivery)) = {
queue.assertExecuting()
+ val (session, delivery) = event
if( consumer_acks == null ) {
// It can happen if we get closed.. but destination is still sending
data..
if( delivery.ack!=null ) {
@@ -256,13 +266,17 @@ class StompProtocolHandler extends Proto
// register on the connection since 1.0 acks may not include the
subscription id
connection_ack_handlers += ( id(delivery.message) -> this )
}
- consumer_acks += id(delivery.message) -> new
TrackedAck(Some(delivery.size), delivery.ack )
+ if( initial_credit_window.auto_credit ) {
+ consumer_acks += id(delivery.message) -> new
TrackedAck(Some((session, delivery.size)), delivery.ack )
+ } else {
+ session_manager.delivered(session, delivery.size)
+ }
}
}
def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit = {
queue.assertExecuting()
- if( initial_credit_window._3 ) {
+ if( initial_credit_window.auto_credit ) {
var found = false
val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
if( id == msgid ) {
@@ -275,7 +289,8 @@ class StompProtocolHandler extends Proto
for( (id, delivery) <- acked ) {
for( credit <- delivery.credit ) {
- credit_window_source.merge((credit, 1))
+ session_manager.delivered(credit._1, credit._2)
+ credit_window_source.merge((1, credit._2))
delivery.credit = None
}
}
@@ -332,8 +347,9 @@ class StompProtocolHandler extends Proto
consumer_acks = null
}
- def track(delivery:Delivery) = {
+ def track(event:(Session[Delivery], Delivery)) = {
queue.assertExecuting();
+ val (session, delivery) = event
if( consumer_acks == null ) {
// It can happen if we get closed.. but destination is still sending
data..
if( delivery.ack!=null ) {
@@ -344,16 +360,21 @@ class StompProtocolHandler extends Proto
// register on the connection since 1.0 acks may not include the
subscription id
connection_ack_handlers += ( id(delivery.message) -> this )
}
- consumer_acks += id(delivery.message) -> new
TrackedAck(Some(delivery.size), delivery.ack)
+ if( initial_credit_window.auto_credit ) {
+ consumer_acks += id(delivery.message) -> new
TrackedAck(Some((session, delivery.size)), delivery.ack)
+ } else {
+ session_manager.delivered(session, delivery.size)
+ }
}
}
def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit = {
queue.assertExecuting()
- if( initial_credit_window._3 ) {
+ if( initial_credit_window.auto_credit ) {
for( delivery <- consumer_acks.get(msgid)) {
for( credit <- delivery.credit ) {
- credit_window_source.merge((credit,1))
+ session_manager.delivered(credit._1, credit._2)
+ credit_window_source.merge((1, credit._2))
delivery.credit = None
}
}
@@ -391,8 +412,9 @@ class StompProtocolHandler extends Proto
}
val consumer_sink = sink_manager.open()
- val credit_window_filter = new
CreditWindowFilter[Delivery](consumer_sink.map { delivery =>
- ack_handler.track(delivery)
+ val credit_window_filter = new CreditWindowFilter[(Session[Delivery],
Delivery)](consumer_sink.map { event =>
+ ack_handler.track(event)
+ val (_, delivery) = event
val message = delivery.message
var frame = if( message.protocol eq StompProtocol ) {
@@ -423,11 +445,11 @@ class StompProtocolHandler extends Proto
}
messages_sent += 1
frame
- }, Delivery)
+ }, SessionDeliverySizer)
- credit_window_filter.credit(initial_credit_window._1,
initial_credit_window._2)
+ credit_window_filter.credit(initial_credit_window.count,
initial_credit_window.size)
- val session_manager = new SessionSinkMux[Delivery](credit_window_filter,
dispatchQueue, Delivery) {
+ val session_manager:SessionSinkMux[Delivery] = new
SessionSinkMux[Delivery](credit_window_filter, dispatchQueue, Delivery) {
override def time_stamp = broker.now
}
@@ -479,7 +501,7 @@ class StompProtocolHandler extends Proto
producer.dispatch_queue.assertExecuting()
retain
- val downstream = session_manager.open(producer.dispatch_queue,
buffer_size)
+ val downstream = session_manager.open(producer.dispatch_queue,
initial_credit_window.count.max(1), buffer_size)
override def toString = "connection to
"+StompProtocolHandler.this.connection.transport.getRemoteAddress
@@ -613,9 +635,7 @@ class StompProtocolHandler extends Proto
config.die_delay.getOrElse(DEFAULT_DIE_DELAY)
}
- def buffer_size = {
-
MemoryPropertyEditor.parse(Option(config.buffer_size).getOrElse("640k")).toInt
- }
+ lazy val buffer_size =
MemoryPropertyEditor.parse(Option(config.buffer_size).getOrElse("640k")).toInt
override def set_connection(connection: BrokerConnection) = {
super.set_connection(connection)
@@ -1276,15 +1296,16 @@ class StompProtocolHandler extends Proto
case Some(value) =>
value.toString.split(",").toList match {
case x :: Nil =>
- (buffer_size, x.toInt, true)
+ InitialCreditWindow(x.toInt, buffer_size, true)
case x :: y :: Nil =>
- (y.toInt, x.toInt, true)
+ InitialCreditWindow(x.toInt, y.toInt, true)
case x :: y :: z :: _ =>
- (y.toInt, x.toInt, z.toBoolean)
- case _ => (buffer_size, 1, true)
+ InitialCreditWindow(x.toInt, y.toInt, z.toBoolean)
+ case _ =>
+ InitialCreditWindow(buffer_size, buffer_size, true)
}
case None =>
- (buffer_size, 1, true)
+ InitialCreditWindow(buffer_size, buffer_size, true)
}
val selector = get(headers, SELECTOR) match {
@@ -1399,9 +1420,9 @@ class StompProtocolHandler extends Proto
case Some(value) =>
value.toString.split(",").toList match {
case x :: Nil =>
- (0, x.toInt)
+ (x.toInt, 0)
case x :: y :: _ =>
- (y.toInt, x.toInt)
+ (x.toInt, y.toInt)
case _ => (0,0)
}
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1341729&r1=1341728&r2=1341729&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Wed May 23 02:25:29 2012
@@ -570,6 +570,27 @@ class StompPersistentQueueTest extends S
class StompDestinationTest extends StompTestSupport {
+ test("APLO-206 - Load balance of job queues using small consumer credit
windows") {
+ connect("1.1")
+
+ for( i <- 1 to 4) {
+ async_send("/queue/load-balanced2", i)
+ }
+
+ subscribe("1", "/queue/load-balanced2", "client", false, "credit:1,0\n")
+ val ack1 = assert_received(1, "1")
+
+ subscribe("2", "/queue/load-balanced2", "client", false, "credit:1,0\n")
+ val ack2 = assert_received(2, "2")
+
+ // Ok lets ack now..
+ ack1(true)
+ val ack3 = assert_received(3, "1")
+
+ ack2(true)
+ val ack4 = assert_received(4, "2")
+ }
+
test("Browsing queues does not cause AssertionError. Reported in APLO-156")
{
connect("1.1")
subscribe("0", "/queue/TOOL.DEFAULT")
@@ -881,33 +902,11 @@ class StompDestinationTest extends Stomp
test("Queues load balance across subscribers") {
connect("1.1")
-
- // Connect to subscribers
- client.write(
- "SUBSCRIBE\n" +
- "destination:/queue/load-balanced\n" +
- "id:1\n" +
- "\n")
-
- client.write(
- "SUBSCRIBE\n" +
- "destination:/queue/load-balanced\n" +
- "receipt:0\n"+
- "id:2\n" +
- "\n")
-
- wait_for_receipt("0")
-
- def put(id:Int) = {
- client.write(
- "SEND\n" +
- "destination:/queue/load-balanced\n" +
- "\n" +
- "message:"+id+"\n")
- }
+ subscribe("1", "/queue/load-balanced")
+ subscribe("2", "/queue/load-balanced")
for( i <- 0 until 4) {
- put(i)
+ async_send("/queue/load-balanced", "message:"+i)
}
var sub1_counter=0