Hi folks,

I am struggling trying to configure the java broker (0.32) with message group 
queues with the documentation 
(https://qpid.apache.org/releases/qpid-0.32/java-broker/book/index.html)
Can anyone shed some light on how to configure message group queues in the java 
broker and  nodejs clients/producers (https://github.com/postwait/node-amqp)?
Any help or ideas would be much appreciated.
Note I can and have been passing messages from the producer to the consumers 
just fine.  Now I'd like to force certain messages to be consumed/ordered by 
using message group queues.

I'm using:
QPID java broker 0.32  configured with a json file where I setup the 
group_header_key to "group_id"
I set the context  group_header_key using the broker management interface.
{
    "id" : "5c6aaf7f-0b21-4b2e-be6a-effec29202bf",
    "name" : "alert.condition.detected",
    "type" : "standard",
    "durable" : true,
    "lifetimePolicy" : "PERMANENT",
    "context" : {
      "qpid.group_header_key" : "group_id"
    },
    "exclusive" : "NONE",
    "messageDurability" : "DEFAULT",
    "owner" : null,
    "lastUpdatedBy" : null,
    "lastUpdatedTime" : 0,
    "createdBy" : null,
    "createdTime" : 0,
    "bindings" : [ {
      "exchange" : "8128f935-e5e5-4953-9e47-6c0fc3cfa1bf",
      "id" : "08c5d3d4-0454-4107-8498-b579713bf90a",
      "name" : "alert.condition.detected",
      "durable" : true,
      "lastUpdatedBy" : "ANONYMOUS",
      "lastUpdatedTime" : 1430868669580,
      "createdBy" : "ANONYMOUS",
      "createdTime" : 1430868669580
    }, {
      "exchange" : "8128f935-e5e5-4953-9e47-6c0fc3cfa1bf",
      "id" : "64ab6b8e-b58a-453d-a341-dc0dbf5f91d9",
      "name" : "condition.detected",
      "durable" : true,
      "lastUpdatedBy" : "admin",
      "lastUpdatedTime" : 1433803383881,
      "createdBy" : "admin",
      "createdTime" : 1433803383881
    } ]
  },

In the node js client I create the queue with the arguments option.
Connection.queue( topic, {
                      autoDelete: false,
                      durable: true,
                      arguments: {
                           'qpid.group_header_key': 'group_id',
                          'qpid.shared_msg_group': 0
                     })


In the node js producer I send (publish the msg on the AMQP exchange) including 
the headers option
Exchange.publish(<topic>, body,   {
                      headers: {
                           group_id:  5
                   })


When I publish without consuming I can view the message in the QPID broker 
management tool.  And it shows the header being set.
[cid:[email protected]]



However when I start up my two consumers the messages are distributed to which 
ever consumer is available.

-          Produce 6 messages 3 in group 1, 3 in group 5

-          Two consumers one set to delay 60 seconds before sending the ack 
back to qpid

-           Prefetch for each consumer is 1.

-          The second consumer ends up processing 5 messages, 3 from group 1 
and the two remaining from group 5.

-          I was expecting the group 5 messages to be held up in the queue 
until the delayed consumer acked the first group 5 message.



Again, any help would be much appreciated.
Thanks,
Scott



Reply via email to