Hi Carl,
Apropos of our conversation over IRC, I am not able to receive the method
response.
First thing is I am not getting the messages sporadically so not sure about
ObjectID. Please let me know about how to get the objectID.
Secondly, if we send the wrong object ID in the Method Request (AM1M) we
should get this status code: value: 1 description: STATUS_UNKNOWN_OBJECT -
objectId not found in the agent
So I am sending a fake one just to see the Method Response AM1m for "queue"
class (as "purge" method in queue does not have arguments so in this code
snippet there are no arguments). So please in your example snippet, please
include the arguments (please use a different method such as "echo" of
"Broker" class) so by this way I can learn how we pass the arguments in the
Method Request... :)
I am sending this,
SAMPLE CODE:
------------------------------
case 5:
mgtclient.opcode="AM1M";
mgtclient.sequenceNo=1000;
// fake objectID for "queue" package -- long objectID
mgtclient.objectID=786767668779789;
// purge method of queue class, there are no arguments for this method
mgtclient.mathodName=”purge";
// there are no arguments
message.clear();
try {
message.put(mgtclient.opcode.getBytes("UTF-8"));
message.putInt(mgtclient.sequenceNo);
encoder.writeUint64(mgtclient.objectID);
encoder.writeStr8(mgtclient.methodName);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println("Query Method Request Sent...");
break;
--------------------
I am intending to receive this atleast..
------------------------
// Method Response
case 'm':
int statusCode = decoder.readUint16();
// should be wrong in case of above method request
// value: 1
//description: STATUS_UNKNOWN_OBJECT - objectId not found in the agent
System.out.println("Status code is" + statusCode);
String statusText= decoder.readStr8();
System.out.println("status Text is" + statusText);
break;
-------------------------
For more information, you can find the Management Client on JIRA 955 (i
also attached it with this email for your further consideration). You will
not find the above code sample in that program as I attached code few days
ago.
Your quick help can expedite my work progress over this weekend. Thank you
in advance.
Best Regards,
Rahul
package apache.qpid.management.client;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.Vector;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.nclient.Client;
import org.apache.qpidity.nclient.Connection;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.nclient.util.MessageListener;
import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
import org.apache.qpidity.transport.DeliveryProperties;
import org.apache.qpidity.transport.MessageAcceptMode;
import org.apache.qpidity.transport.MessageAcquireMode;
import org.apache.qpidity.transport.MessageCreditUnit;
import org.apache.qpidity.transport.MessageProperties;
import org.apache.qpidity.transport.ReplyTo;
/*
* @author [EMAIL PROTECTED]
*
* This program
*
* 1. sends the encoded management request messages to the broker, and
* 2. receives the decode management response messages from the broker
*
* Prerequisite: before running this program
* 1. start the c++ qpid broker
* 2. run the queue.java program from the same package
*/
public class ManagementClient extends Thread implements MessageListener
{
public Vector<Byte> schemaHash;
// connection to broker
public static Connection con;
// session
public static Session session;
// AMQP type
String opcode;
// message sequence number
int sequenceNo;
// package name
String packageName;
// class name
String className;
// schema for all possible classes which the broker knows
static byte[] schemaAgent;
static byte[] schemaBinding;
static byte[] schemaBridge;
static byte[] schemaBroker;
static byte[] schemaClient;
static byte[] schemaConsumer;
static byte[] schemaDestination;
static byte[] schemaExchange;
static byte[] schemaLink;
static byte[] schemaProducer;
static byte[] schemaQueue;
static byte[] schemaSession;
static byte[] schemaSystem;
static byte[] schemaVhost;
// the message buffer
ByteBuffer buf;
/*
* (non-Javadoc)
* @see
org.apache.qpidity.nclient.util.MessageListener#onMessage(org.apache.qpidity.api.Message)
*
* this method received the incoming response from the broker and print
the
* incoming responses format, such as "broker query response" (AM1b),
"package query response" (AM1p),
* "class query response" (AM1q), "schema query response" (AM1s) etc.
*/
public void onMessage(Message m)
{
try
{
buf = m.readData();
System.out.println();
System.out.println();
// get the Magic number "AM1"
char magicNumber = (char)buf.get();
System.out.println(magicNumber);
magicNumber = (char)buf.get();
System.out.println(magicNumber);
magicNumber = (char)buf.get();
System.out.println(magicNumber);
// get the opcode
char opcode = (char)buf.get();
System.out.println(opcode);
// java decoder
ManagementDecoder decoder = new ManagementDecoder(buf);
// decode the sequence number
System.out.println("Sequence No: " +
decoder.readSequenceNo());
// decision will be made based on the opcode of the
response
switch(opcode)
{
/*
* if the broker sends the "broker query response"
message
* Broker Response Message
*
+-----+-----+-----+-----+-----------------------+
| 'A' | 'M' | '1' | 'b' | 0
|
+-----+-----+-----+-----+-----------------------+----------------------------+
| brokerId (uuid)
|
+----------------------------------------------------------------------------+
*/
case 'b':
// decode the package name
UUID brokerID = decoder.readUuid();
System.out.println("Broker Identifier: " +
brokerID);
break;
/* if the broker sends the "package query
response" message
* Package Indication Message
*
+-----+-----+-----+-----+-----------------------+
| 'A' | 'M' | '1' | 'p' | seq
|
+-----+-----+-----+-----+-----------------------+----------+
| package name (short string)
|
+----------------------------------------------------------+
*/
case 'p':
String packagenamea = decoder.readStr8();
System.out.println("Package Name: " +
packagenamea);
break;
/*
* if the broker sends the "class query
response" message
*
+-----+-----+-----+-----+-----------------------+
| 'A' | 'M' | '1' | 'q' | seq |
+-----+-----+-----+-----+-----------------------+----------+
| package name (short string)
|
+----------------------------------------------------------+
| class name (short string)
|
+----------------------------------------------------------+
| schema hash (bin128)
|
+----------------------------------------------------------+
*/
case 'q':
// decode the package name
String packagenama = decoder.readStr8();
System.out.println("Package Name: " +
packagenama);
// decode the class name
String clasnamd = decoder.readStr8();
System.out.println("Class Name: " + clasnamd);
// if the class name is agent then
if(clasnamd.equals("agent"))
{
// decode the schema hash
schemaAgent=decoder.readBin128();
}
if(clasnamd.equals("binding"))
{
schemaBinding=decoder.readBin128();
}
if(clasnamd.equals("bridge"))
{
schemaBridge=decoder.readBin128();
}
if(clasnamd.equals("broker"))
{
schemaBroker=decoder.readBin128();
}
if(clasnamd.equals("client"))
{
schemaClient=decoder.readBin128();
}
if(clasnamd.equals("consumer"))
{
schemaConsumer=decoder.readBin128();
}
if(clasnamd.equals("destination"))
{
schemaDestination=decoder.readBin128();
}
if(clasnamd.equals("exchange"))
{
schemaExchange=decoder.readBin128();
}
if(clasnamd.equals("link"))
{
schemaLink=decoder.readBin128();
}
if(clasnamd.equals("producer"))
{
schemaProducer=decoder.readBin128();
}
if(clasnamd.equals("queue"))
{
schemaQueue=decoder.readBin128();
}
if(clasnamd.equals("session"))
{
schemaSession=decoder.readBin128();
}
if(clasnamd.equals("system"))
{
schemaSystem=decoder.readBin128();
}
if(clasnamd.equals("vhost"))
{
schemaVhost=decoder.readBin128();
}
break;
/*
* if the response is "schema response"
*
+-----+-----+-----+-----+-----------------------+
| 'A' | 'M' | '1' | 's' | seq |
+-----+-----+-----+-----+-----------------------+----------+
| packageName (short string)
|
+----------------------------------------------------------+
| className (short string)
|
+----------------------------------------------------------+
| schema-hash (bin128)
|
+-----------+-----------+-----------+-----------+----------+
| propCnt | statCnt | methodCnt | eventCnt |
+-----------+-----------+-----------+-----------+----------------------------+
| propCnt property records
|
+----------------------------------------------------------------------------+
| statCnt statistic records
|
+----------------------------------------------------------------------------+
| methodCnt method records
|
+----------------------------------------------------------------------------+
| eventCnt event records
|
+----------------------------------------------------------------------------+
*/
case 's':
// decode the package name
String packagename = decoder.readStr8();
System.out.println("Package Name: " +
packagename);
// decode the class name
String clasnam = decoder.readStr8();
System.out.println("Class Name: " + clasnam);
// decode the schema hash
decoder.readBin128();
// get the decoded properties contents
int propCnt = decoder.readUint16();
System.out.println("Property content: " +
propCnt);
// get the decoded statistics contents
long statCnt = decoder.readUint16();
System.out.println("Statistic content: " +
statCnt);
// get the decoded method contents
int methodCnt = decoder.readUint16();
System.out.println("Method content: " +
methodCnt);
// get the decoded event contents
int eventCnt = decoder.readUint16();
System.out.println("Event content: " +
eventCnt);
// just to make the output readable
if (propCnt>0)
{
System.out.println();
System.out.println("*** Properties
Contents are ***");
System.out.println();
}
// decode the map of properties contents
for( int i = 0; i < propCnt; i++ )
{
// decode the MAP
Map<String,Object> map =
decoder.readMap();
// print the map
printMap(map);
}
// just to make the output readable
if (statCnt>0)
{
System.out.println();
System.out.println("*** Statistics
Contents are ***");
System.out.println();
}
// decode the map of statistics contents
for( int i = 0; i < statCnt; i++ )
{ // decode the MAP
Map<String,Object> map =
decoder.readMap();
printMap(map);
}
// just to make the output readable
if (methodCnt>0)
{
System.out.println();
System.out.println("*** Method Contents
are ***");
System.out.println();
}
// decode the map of method contents
for( int i = 0; i < methodCnt; i++ )
{ // decode the MAP
Map<String,Object> map =
decoder.readMap();
printMap(map);
}
// just to make the output readable
if (eventCnt>0)
{
System.out.println();
System.out.println("*** Event Contents
are ***");
System.out.println();
}
// decode the map of event contents
for( int i = 0; i < eventCnt; i++ )
{ // decode the MAP
Map<String,Object> map =
decoder.readMap();
printMap(map);
}
break;
}
}catch(Exception e)
{
System.out.print("Error reading message");
e.printStackTrace();
}
}
/*
* print the decoded map
*
*/
public void printMap(Map<String,Object> mapnew)
{
Set s=mapnew.entrySet();
for (Iterator<Integer> iterator = s.iterator();
iterator.hasNext();)
{
Object object = (Object) iterator.next();
System.out.println(object.toString());
// this is for method content to read the map upto
argument count
if (object.toString().contains("argCount="))
{
String ar = object.toString().substring(9);
for( int i = 0; i < Integer.parseInt(ar); i++ )
{
ManagementDecoder decoder = new
ManagementDecoder(buf);
// decode the MAP
Map<String,Object> map =
decoder.readMap();
printMap(map);
}
}
}
}
/*
* With this method Management Client listens the incoming response
messages from the broker
* on the "reply" queue (atmost 15 messages per request).
*
* (non-Javadoc)
* @see java.lang.Thread#run()
*/
public void run()
{
ManagementClient mgtclient = new ManagementClient();
// create a subscription with the "reply" queue
boolean isEnterKeyStroke=true;
while(isEnterKeyStroke)
{
// create a subscription with the "reply" queue
session.messageSubscribe("reply",
"listener_reply",
Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
new
MessagePartListenerAdapter(mgtclient), null);
// controls the flow of message data to a given
destination
session.messageFlow("listener_reply",
MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES);
// will get maximum of 15 messages
session.messageFlow("listener_reply",
MessageCreditUnit.MESSAGE, 15);
session.sync();
// This method cancels a consumer. The server will not
send any more messages to the "listener_reply" destination.
session.messageCancel("listener_reply");
//cleanup
//session.sessionDetach(session.getName());
}
}
/*
* 1. create connection and session with the broker
* 2. run the listener (mgtClient.start()) in ManagementClient which
listens the incoming
* response messages from the broker on the reply queue
* 3. encode and send request messages to the broker
*/
public static void main(String[] args) throws FileNotFoundException,
IOException
{
ManagementClient mgtclient=new ManagementClient();
// Create connection
con = Client.createConnection();
ByteBuffer message= ByteBuffer.allocate(200);
try
{ // connect to local host on default port 5672
con.connect("localhost", 5672, "test", "guest",
"guest");
}
catch(Exception e)
{
System.out.print("Error connecting to broker");
e.printStackTrace();
}
// Management Client Started ***First message on the console***
System.out.println("Management Listener Started...");
// java encoder
ManagementEncoder encoder=new ManagementEncoder(message);
// create session
session = con.createSession(0);
DeliveryProperties deliveryProps = new DeliveryProperties();
// set the routing key as "agent"
deliveryProps.setRoutingKey("agent");
MessageProperties messageProps=new MessageProperties();
// set replyTo field so that messages return to the "reply"
queue
ReplyTo rpt=new ReplyTo();
rpt.setExchange("amq.direct");
rpt.setRoutingKey("reply");
messageProps.setReplyTo(rpt);
// run the listener
mgtclient.start();
boolean flag=true;
while(flag)
{
// transfer message to "qpid.management" exchange
session.messageTransfer("qpid.management",
MessageAcceptMode.EXPLICIT,MessageAcquireMode.PRE_ACQUIRED);
session.header(deliveryProps,messageProps);
// *** Menu ***
System.out.println(" ******* MENU
*************");
System.out.println("#########################################################");
System.out.println();
System.out.println("1. Broker Query Request");
System.out.println("2. Package Query Request");
System.out.println("3. Class Query Request");
System.out.println("4. Schema Query Request");
System.out.println("5. Method Request");
System.out.println("0. Exit");
System.out.println();
System.out.println("#########################################################");
System.out.println("Please Enter your choice for
REQUEST");
// read the user input from the console
BufferedReader br = new BufferedReader(new
InputStreamReader(System.in));
int choice=0;
try{
choice = Integer.parseInt(br.readLine().trim());
} catch (NumberFormatException n)
{
System.out.println("Enter the choice please");
System.exit(1);
}
switch (choice)
{
// to stop the the management client
case 0:
System.out.println("Management Client is
stopping....");
System.exit(-1);
break;
/* if user enters 1 then send the "broker
request query"
* if user enters 1 then a "Broker Query"
message will be sent to the broker
*
+-----+-----+-----+-----+-----------------------+
| 'A' | 'M' | '1' | 'B' | 0 |
+-----+-----+-----+-----+-----------------------+
The Broker Request message has no
payload.
*/
case 1:
mgtclient.opcode="AM1B";
message.clear();
try {
// encode the broker request and put
into the request message
message.put(mgtclient.opcode.getBytes("UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println("Query Broker Request
Sent...");
break;
/*
* if user enters 2 then a "Package Query"
message will be sent to the broker
*
*
+-----+-----+-----+-----+-----------------------+
| 'A' | 'M' | '1' | 'P' | seq
|
+-----+-----+-----+-----+-----------------------+
*
*/
case 2:
mgtclient.opcode="AM1P";
mgtclient.sequenceNo=3000;
message.clear();
try {
message.put(mgtclient.opcode.getBytes("UTF-8"));
message.putInt(mgtclient.sequenceNo);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println("Query Package Request
Sent...");
break;
/*
* if user enters 3 then a "Class Query"
message will be sent to the broker
*
*
+-----+-----+-----+-----+-----------------------+
| 'A' | 'M' | '1' | 'Q' | seq |
+-----+-----+-----+-----+-----------------------+----------+
| package name (short string)
|
+----------------------------------------------------------+
*/
case 3:
mgtclient.opcode="AM1Q";
mgtclient.sequenceNo=1000;
mgtclient.packageName="qpid";
message.clear();
try {
message.put(mgtclient.opcode.getBytes("UTF-8"));
message.putInt(mgtclient.sequenceNo);
// encode the package
encoder.writeStr8(mgtclient.packageName);
//message.putShort((byte)
mgtclient.packageName.length());
//message.put(mgtclient.packageName.getBytes("UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println("Query Class Request
Sent...");
break;
/*
* if user enters 4 then a "Schema request"
message will be sent to the broker
*
+-----+-----+-----+-----+-----------------------+
| 'A' | 'M' | '1' | 'S' | seq
|
+-----+-----+-----+-----+-----------------------+----------+
| packageName (short string)
|
+----------------------------------------------------------+
| className (short string)
|
+----------------------------------------------------------+
| schema-hash (bin128)
|
+----------------------------------------------------------+
*/
case 4:
String EnterClassName;
System.out.println("*************************");
System.out.println();
System.out.println("1. agent");
System.out.println("2. binding");
System.out.println("3. bridge");
System.out.println("4. broker");
System.out.println("5. client");
System.out.println("6. consumer");
System.out.println("7. destination");
System.out.println("8. exchange");
System.out.println("9. link");
System.out.println("10. producer");
System.out.println("11. queue");
System.out.println("12. session");
System.out.println("13. system");
System.out.println("14. vhost");
System.out.println();
System.out.println("*************************");
System.out.println("Enter the class name (in
small letters) for which you need SCHEMA");
BufferedReader bra = new BufferedReader(new
InputStreamReader(System.in));
EnterClassName = bra.readLine();
// if user's input matches with any of the
classes the broker knows,
// it encodes the schema hash and sends the
schema request for that class
if(EnterClassName.equals("agent") ||
EnterClassName.equals("binding") ||
EnterClassName.equals("bridge")
|| EnterClassName.equals("broker") ||
EnterClassName.equals("client")
|| EnterClassName.equals("consumer") ||
EnterClassName.equals("destination") || EnterClassName.equals("exchange") ||
EnterClassName.equals("link")
|| EnterClassName.equals("producer") ||
EnterClassName.equals("queue")
|| EnterClassName.equals("session") ||
EnterClassName.equals("system")
|| EnterClassName.equals("vhost"))
{
mgtclient.opcode="AM1S";
mgtclient.sequenceNo=444;
mgtclient.packageName="qpid";
mgtclient.className=EnterClassName;
message.clear();
try {
message.put(mgtclient.opcode.getBytes("UTF-8"));
message.putInt(mgtclient.sequenceNo);
encoder.writeStr8(mgtclient.packageName);
encoder.writeStr8(mgtclient.className);
if(mgtclient.className.equals("agent"))
{
encoder.writeBin128(schemaAgent);
}
if(mgtclient.className.equals("binding"))
{
encoder.writeBin128(schemaBinding);
}
if(mgtclient.className.equals("bridge"))
{
encoder.writeBin128(schemaBridge);
}
if(mgtclient.className.equals("broker"))
{
encoder.writeBin128(schemaBroker);
}
if(mgtclient.className.equals("client"))
{
encoder.writeBin128(schemaClient);
}
if(mgtclient.className.equals("consumer"))
{
encoder.writeBin128(schemaConsumer);
}
if(mgtclient.className.equals("destination"))
{
encoder.writeBin128(schemaDestination);
}
if(mgtclient.className.equals("exchange"))
{
encoder.writeBin128(schemaExchange);
}
if(mgtclient.className.equals("link"))
{
encoder.writeBin128(schemaLink);
}
if(mgtclient.className.equals("producer"))
{
encoder.writeBin128(schemaProducer);
}
if(mgtclient.className.equals("queue"))
{
encoder.writeBin128(schemaQueue);
}
if(mgtclient.className.equals("session"))
{
encoder.writeBin128(schemaSession);
}
if(mgtclient.className.equals("system"))
{
encoder.writeBin128(schemaSystem);
}
if(mgtclient.className.equals("vhost"))
{
encoder.writeBin128(schemaVhost);
}
} catch (UnsupportedEncodingException
e) {
e.printStackTrace();
}
System.out.println("Query Schema
Request Sent...");
}
// if user input any other class name or any
other name
else
{
System.out.println("Please enter the
valid choice from the above mentioned list");
System.exit(1);
}
break;
case 5:
break;
default:
System.out.println("Invalid number");
System.exit(1);
break;
}
message.flip();
// send message
session.data(message);
session.endData();
// confirm completion
session.sync();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//cleanup
session.sessionDetach(session.getName());
try
{ // close connection
con.close();
}
catch(Exception e)
{
System.out.print("Error closing broker connection");
e.printStackTrace();
}
}
}