Author: tross Date: Fri Jun 26 17:40:01 2009 New Revision: 788782 URL: http://svn.apache.org/viewvc?rev=788782&view=rev Log: QPID-1948 - Changes to the java consle as a result of a code generated front end. Patch from Bryan Kearney
Added: qpid/trunk/qpid/java/management/console/src/test/ qpid/trunk/qpid/java/management/console/src/test/java/ qpid/trunk/qpid/java/management/console/src/test/java/org/ qpid/trunk/qpid/java/management/console/src/test/java/org/apache/ qpid/trunk/qpid/java/management/console/src/test/java/org/apache/qpid/ qpid/trunk/qpid/java/management/console/src/test/java/org/apache/qpid/console/ qpid/trunk/qpid/java/management/console/src/test/java/org/apache/qpid/console/ClassKeyTest.java Modified: qpid/trunk/qpid/java/build.deps qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/Broker.java qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/ClassKey.java qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/QMFObject.java qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/SchemaClass.java qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/Session.java qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/XMLUtil.java Modified: qpid/trunk/qpid/java/build.deps URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/build.deps?rev=788782&r1=788781&r2=788782&view=diff ============================================================================== --- qpid/trunk/qpid/java/build.deps (original) +++ qpid/trunk/qpid/java/build.deps Fri Jun 26 17:40:01 2009 @@ -167,6 +167,7 @@ tools.test.libs=${client.test.libs} testkit.test.libs=${test.libs} management-client.test.libs=${muse.libs} ${test.libs} ${log4j} ${javassist} ${geronimo-servlet} ${commons-pool} +management-console.test.libs=${junit4} ${slf4j-log4j} ${log4j} ${client.libs} management-eclipse-plugin.test.libs=${systests.libs} broker-plugins.test.libs=${test.libs} management-tools-qpid-cli.test.libs=${junit4} ${slf4j-log4j} ${log4j} ${client.libs} Modified: qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/Broker.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/Broker.java?rev=788782&r1=788781&r2=788782&view=diff ============================================================================== --- qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/Broker.java (original) +++ qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/Broker.java Fri Jun 26 17:40:01 2009 @@ -1,505 +1,505 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.console; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.UUID; - -import javax.jms.BytesMessage; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.transport.codec.BBDecoder; -import org.apache.qpid.transport.codec.BBEncoder; -import org.apache.qpid.transport.codec.Decoder; -import org.apache.qpid.transport.codec.Encoder; - -public class Broker implements MessageListener -{ - class HeaderInfo - { - boolean valid; - long sequence; - char opcode; - - public String toString() - { - return String.format("%s Header with opcode %s and sequence %s", - (valid ? "Valid" : "Invalid"), opcode, sequence); - } - } - private static Logger log = LoggerFactory.getLogger(Broker.class); - public static int SYNC_TIME = 60000; - // JMS Stuff - private javax.jms.Session session; - boolean sessionTransacted = false; - private String replyName; - private String topicName; - private MessageProducer prod; - private ArrayList<MessageConsumer> consumers = new ArrayList<MessageConsumer>(); - private Queue reply; - private Queue topic; - private int acknowledgeMode = javax.jms.Session.AUTO_ACKNOWLEDGE; - // QMF Stuff - AMQConnection connection; - public String url; - public java.util.HashMap<String, Agent> Agents = new java.util.HashMap<String, Agent>(); - private Session consoleSession; - private boolean connected = false; - private boolean syncInFlight = false; - private boolean topicBound = false; - private int reqsOutstanding = 0; - private Object lockObject = new Object(); - - UUID brokerId = UUID.randomUUID(); - - public Broker(org.apache.qpid.console.Session session, String url) - { - log.debug("Creating a new Broker for url " + url); - this.url = url; - consoleSession = session; - this.tryToConnect(); - } - - public int brokerBank() - { - return 1; - } - - protected HeaderInfo CheckHeader(Decoder decoder) - { - HeaderInfo returnValue = new HeaderInfo(); - returnValue.opcode = 'x'; - returnValue.sequence = -1; - returnValue.valid = false; - if (decoder.hasRemaining()) - { - char character = (char) decoder.readUint8(); - if (character != 'A') - { - return returnValue; - } - character = (char) decoder.readUint8(); - if (character != 'M') - { - return returnValue; - } - character = (char) decoder.readUint8(); - if (character != '3') - { - return returnValue; - } - returnValue.valid = true; - returnValue.opcode = (char) decoder.readUint8(); - returnValue.sequence = decoder.readUint32(); - } - return returnValue; - } - - public Encoder createEncoder(char opcode, long sequence) - { - return setHeader(new BBEncoder(1024), opcode, sequence); - } - - public Message createMessage(Encoder enc) - { - try - { - byte[] buf = new byte[1024]; - byte[] body = new byte[1024]; - BBEncoder bbenc = (BBEncoder) enc; - BytesMessage msg = session.createBytesMessage(); - ByteBuffer slice = bbenc.buffer(); - while (slice.hasRemaining()) - { - int n = Math.min(buf.length, slice.remaining()); - slice.get(buf, 0, n); - msg.writeBytes(buf, 0, n); - } - return msg; - } catch (JMSException e) - { - throw new ConsoleException(e); - } - } - - public void decrementOutstanding() - { - synchronized (lockObject) - { - this.reqsOutstanding -= 1; - if ((reqsOutstanding == 0) & !topicBound) - { - for (String key : consoleSession.bindingKeys()) - { - try - { - // this.clientSession.exchangeBind(topicName, - // "qpid.mannagement", key) ; - log.debug("Setting Topic Binding " + key); - // topicName = "management://qpid.management//" + key; - String rk = String.format("&routingkey='%s'", key); - Queue aQueue = session.createQueue(topicName + rk); - MessageConsumer cons = session.createConsumer(aQueue); - cons.setMessageListener(this); - consumers.add(cons); - } catch (JMSException e) - { - throw new ConsoleException(e); - } - } - topicBound = true; - } - if ((reqsOutstanding == 0) & syncInFlight) - { - syncInFlight = false; - lockObject.notifyAll(); - } - } - } - - private byte[] ensure(int capacity, byte[] body, int size) - { - if (capacity > body.length) - { - byte[] copy = new byte[capacity]; - System.arraycopy(body, 0, copy, 0, size); - body = copy; - } - return body; - } - - protected void finalize() - { - if (connected) - { - this.shutdown(); - } - } - - public boolean getSyncInFlight() - { - return syncInFlight; - } - - public void incrementOutstanding() - { - synchronized (lockObject) - { - this.reqsOutstanding += 1; - } - } - - public boolean isConnected() - { - return connected; - } - - public void onMessage(Message msg) - { - Decoder decoder = readBody(msg); - HeaderInfo headerInfo = this.CheckHeader(decoder); - // log.debug(headerInfo.toString()); - while (headerInfo.valid) - { - long seq = headerInfo.sequence; - switch (headerInfo.opcode) - { - case 'b': - consoleSession.handleBrokerResponse(this, decoder, seq); - break; - case 'p': - consoleSession.handlePackageIndicator(this, decoder, seq); - break; - case 'z': - consoleSession.handleCommandComplete(this, decoder, seq); - break; - case 'q': - consoleSession.handleClassIndicator(this, decoder, seq); - break; - case 'm': - consoleSession.handleMethodResponse(this, decoder, seq); - break; - case 'h': - consoleSession - .handleHeartbeatIndicator(this, decoder, seq, msg); - break; - case 'e': - consoleSession.handleEventIndicator(this, decoder, seq); - break; - case 's': - consoleSession.handleSchemaResponse(this, decoder, seq); - break; - case 'c': - consoleSession.handleContentIndicator(this, decoder, seq, true, - false); - break; - case 'i': - consoleSession.handleContentIndicator(this, decoder, seq, - false, true); - break; - case 'g': - consoleSession.handleContentIndicator(this, decoder, seq, true, - true); - break; - default: - log.error("Invalid message type recieved with opcode " - + headerInfo.opcode); - break; - } - headerInfo = this.CheckHeader(decoder); - } - } - - private Decoder readBody(Message message) - { - BytesMessage msg = (BytesMessage) message; - BBDecoder dec = new BBDecoder(); - byte[] buf = new byte[1024]; - byte[] body = new byte[1024]; - int size = 0; - int n; - try - { - while ((n = msg.readBytes(buf)) > 0) - { - body = ensure(size + n, body, size); - System.arraycopy(buf, 0, body, size, n); - size += n; - } - } catch (JMSException e) - { - throw new ConsoleException(e); - } - dec.init(ByteBuffer.wrap(body, 0, size)); - return dec; - } - - public void send(Encoder enc) - { - this.send(this.createMessage(enc), "broker"); - } - - public void send(Message msg) - { - this.send(msg, "broker", -1); - } - - public void send(Message msg, String routingKey) - { - this.send(msg, routingKey, -1); - } - - public void send(Message msg, String routingKey, int ttl) - { - synchronized (lockObject) - { - try - { - log.debug(String.format("Sending message to routing key '%s'", - routingKey)); - String destName = String.format( - "management://qpid.management//?routingkey='%s'", - routingKey); - log.debug(destName); - Queue dest = session.createQueue(destName); - // Queue jmsReply = session - // createQueue("direct://amq.direct//?routingkey='reply-" - // + brokerId + "'"); - if (ttl != -1) - { - msg.setJMSExpiration(ttl); - } - msg.setJMSReplyTo(reply); - prod.send(dest, msg); - } catch (Exception e) - { - throw new ConsoleException(e); - } - } - } - - protected Encoder setHeader(Encoder enc, char opcode, long sequence) - { - enc.writeUint8((short) 'A'); - enc.writeUint8((short) 'M'); - enc.writeUint8((short) '3'); - enc.writeUint8((short) opcode); - enc.writeUint32(sequence); - return enc; - } - - public void setSyncInFlight(boolean inFlight) - { - synchronized (lockObject) - { - syncInFlight = inFlight; - lockObject.notifyAll(); - } - } - - public void shutdown() - { - if (connected) - { - this.waitForStable(); - try - { - session.close(); - for (MessageConsumer cons : consumers) - { - cons.close(); - } - connection.close(); - } catch (Exception e) - { - throw new ConsoleException(e); - } finally - { - this.connected = false; - } - } - } - - protected void tryToConnect() - { - try - { - reqsOutstanding = 1; - Agent newAgent = new Agent(this, 0, "BrokerAgent"); - Agents.put(newAgent.agentKey(), newAgent); - connection = new AMQConnection(url); - session = connection.createSession(sessionTransacted, - acknowledgeMode); - replyName = String - .format( - "direct://amq.direct//reply-%s?exclusive='True'&autodelete='True'", - brokerId); - topicName = String - .format( - "management://qpid.management//topic-%s?exclusive='True'&autodelete='True'", - brokerId); - reply = session.createQueue(replyName); - MessageConsumer cons = session.createConsumer(reply); - cons.setMessageListener(this); - consumers.add(cons); - prod = session.createProducer(null); - topic = session.createQueue(topicName); - cons = session.createConsumer(topic); - cons.setMessageListener(this); - consumers.add(cons); - connection.start(); - // Rest of the topic is bound later. Start er up - } catch (Exception e) - { - throw new ConsoleException(e); - } - connected = true; - consoleSession.handleBrokerConnect(this); - Encoder Encoder = createEncoder('B', 0); - this.send(Encoder); - } - - public void updateAgent(QMFObject obj) - { - long agentBank = (Long) obj.GetProperty("agentBank"); - long brokerBank = (Long) obj.GetProperty("brokerBank"); - String key = Agent.AgentKey(agentBank, brokerBank); - if (obj.isDeleted()) - { - if (Agents.containsKey(key)) - { - Agent agent = Agents.get(key); - Agents.remove(key); - consoleSession.handleAgentRemoved(agent); - } - } else - { - if (!Agents.containsKey(key)) - { - Agent newAgent = new Agent(this, agentBank, (String) obj - .GetProperty("label")); - Agents.put(key, newAgent); - consoleSession.handleNewAgent(newAgent); - } - } - } - - public void waitForStable() - { - synchronized (lockObject) - { - if (connected) - { - long start = System.currentTimeMillis(); - syncInFlight = true; - while (reqsOutstanding != 0) - { - log.debug("Waiting to recieve messages"); - try - { - lockObject.wait(SYNC_TIME); - } catch (Exception e) - { - throw new ConsoleException(e); - } - long duration = System.currentTimeMillis() - start; - if (duration > SYNC_TIME) - { - throw new ConsoleException( - "Timeout waiting for Broker to Sync"); - } - } - } - } - } - - public void waitForSync(int timeout) - { - synchronized (lockObject) - { - long start = System.currentTimeMillis(); - while (syncInFlight) - { - try - { - lockObject.wait(SYNC_TIME); - } catch (Exception e) - { - throw new ConsoleException(e); - } - } - long duration = System.currentTimeMillis() - start; - if (duration > timeout) - { - throw new ConsoleException("Timeout waiting for Broker to Sync"); - } - } - } +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.console; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.UUID; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.transport.codec.BBDecoder; +import org.apache.qpid.transport.codec.BBEncoder; +import org.apache.qpid.transport.codec.Decoder; +import org.apache.qpid.transport.codec.Encoder; + +public class Broker implements MessageListener +{ + class HeaderInfo + { + boolean valid; + long sequence; + char opcode; + + public String toString() + { + return String.format("%s Header with opcode %s and sequence %s", + (valid ? "Valid" : "Invalid"), opcode, sequence); + } + } + + private static Logger log = LoggerFactory.getLogger(Broker.class); + public static int SYNC_TIME = 60000; + // JMS Stuff + private javax.jms.Session session; + boolean sessionTransacted = false; + private String replyName; + private String topicName; + private MessageProducer prod; + private ArrayList<MessageConsumer> consumers = new ArrayList<MessageConsumer>(); + private Queue reply; + private Queue topic; + private int acknowledgeMode = javax.jms.Session.AUTO_ACKNOWLEDGE; + // QMF Stuff + AMQConnection connection; + public String url; + public java.util.HashMap<String, Agent> Agents = new java.util.HashMap<String, Agent>(); + private Session consoleSession; + private boolean connected = false; + private boolean syncInFlight = false; + private boolean topicBound = false; + private int reqsOutstanding = 0; + private Object lockObject = new Object(); + UUID brokerId = UUID.randomUUID(); + + public Broker(org.apache.qpid.console.Session session, String url) + { + log.debug("Creating a new Broker for url " + url); + this.url = url; + consoleSession = session; + this.tryToConnect(); + } + + public int brokerBank() + { + return 1; + } + + protected HeaderInfo CheckHeader(Decoder decoder) + { + HeaderInfo returnValue = new HeaderInfo(); + returnValue.opcode = 'x'; + returnValue.sequence = -1; + returnValue.valid = false; + if (decoder.hasRemaining()) + { + char character = (char) decoder.readUint8(); + if (character != 'A') + { + return returnValue; + } + character = (char) decoder.readUint8(); + if (character != 'M') + { + return returnValue; + } + character = (char) decoder.readUint8(); + if (character != '3') + { + return returnValue; + } + returnValue.valid = true; + returnValue.opcode = (char) decoder.readUint8(); + returnValue.sequence = decoder.readUint32(); + } + return returnValue; + } + + public Encoder createEncoder(char opcode, long sequence) + { + return setHeader(new BBEncoder(1024), opcode, sequence); + } + + public Message createMessage(Encoder enc) + { + try + { + byte[] buf = new byte[1024]; + byte[] body = new byte[1024]; + BBEncoder bbenc = (BBEncoder) enc; + BytesMessage msg = session.createBytesMessage(); + ByteBuffer slice = bbenc.buffer(); + while (slice.hasRemaining()) + { + int n = Math.min(buf.length, slice.remaining()); + slice.get(buf, 0, n); + msg.writeBytes(buf, 0, n); + } + return msg; + } catch (JMSException e) + { + throw new ConsoleException(e); + } + } + + public void decrementOutstanding() + { + synchronized (lockObject) + { + this.reqsOutstanding -= 1; + if ((reqsOutstanding == 0) & !topicBound) + { + for (String key : consoleSession.bindingKeys()) + { + try + { + // this.clientSession.exchangeBind(topicName, + // "qpid.mannagement", key) ; + log.debug("Setting Topic Binding " + key); + // topicName = "management://qpid.management//" + key; + String rk = String.format("&routingkey='%s'", key); + Queue aQueue = session.createQueue(topicName + rk); + MessageConsumer cons = session.createConsumer(aQueue); + cons.setMessageListener(this); + consumers.add(cons); + } catch (JMSException e) + { + throw new ConsoleException(e); + } + } + topicBound = true; + } + if ((reqsOutstanding == 0) & syncInFlight) + { + syncInFlight = false; + lockObject.notifyAll(); + } + } + } + + private byte[] ensure(int capacity, byte[] body, int size) + { + if (capacity > body.length) + { + byte[] copy = new byte[capacity]; + System.arraycopy(body, 0, copy, 0, size); + body = copy; + } + return body; + } + + protected void finalize() + { + if (connected) + { + this.shutdown(); + } + } + + public boolean getSyncInFlight() + { + return syncInFlight; + } + + public void incrementOutstanding() + { + synchronized (lockObject) + { + this.reqsOutstanding += 1; + } + } + + public boolean isConnected() + { + return connected; + } + + public void onMessage(Message msg) + { + Decoder decoder = readBody(msg); + HeaderInfo headerInfo = this.CheckHeader(decoder); + // log.debug(headerInfo.toString()); + while (headerInfo.valid) + { + long seq = headerInfo.sequence; + switch (headerInfo.opcode) + { + case 'b': + consoleSession.handleBrokerResponse(this, decoder, seq); + break; + case 'p': + consoleSession.handlePackageIndicator(this, decoder, seq); + break; + case 'z': + consoleSession.handleCommandComplete(this, decoder, seq); + break; + case 'q': + consoleSession.handleClassIndicator(this, decoder, seq); + break; + case 'm': + consoleSession.handleMethodResponse(this, decoder, seq); + break; + case 'h': + consoleSession + .handleHeartbeatIndicator(this, decoder, seq, msg); + break; + case 'e': + consoleSession.handleEventIndicator(this, decoder, seq); + break; + case 's': + consoleSession.handleSchemaResponse(this, decoder, seq); + break; + case 'c': + consoleSession.handleContentIndicator(this, decoder, seq, true, + false); + break; + case 'i': + consoleSession.handleContentIndicator(this, decoder, seq, + false, true); + break; + case 'g': + consoleSession.handleContentIndicator(this, decoder, seq, true, + true); + break; + default: + log.error("Invalid message type recieved with opcode " + + headerInfo.opcode); + break; + } + headerInfo = this.CheckHeader(decoder); + } + } + + private Decoder readBody(Message message) + { + BytesMessage msg = (BytesMessage) message; + BBDecoder dec = new BBDecoder(); + byte[] buf = new byte[1024]; + byte[] body = new byte[1024]; + int size = 0; + int n; + try + { + while ((n = msg.readBytes(buf)) > 0) + { + body = ensure(size + n, body, size); + System.arraycopy(buf, 0, body, size, n); + size += n; + } + } catch (JMSException e) + { + throw new ConsoleException(e); + } + dec.init(ByteBuffer.wrap(body, 0, size)); + return dec; + } + + public void send(Encoder enc) + { + this.send(this.createMessage(enc), "broker"); + } + + public void send(Message msg) + { + this.send(msg, "broker", -1); + } + + public void send(Message msg, String routingKey) + { + this.send(msg, routingKey, -1); + } + + public void send(Message msg, String routingKey, int ttl) + { + synchronized (lockObject) + { + try + { + log.debug(String.format("Sending message to routing key '%s'", + routingKey)); + String destName = String.format( + "management://qpid.management//?routingkey='%s'", + routingKey); + log.debug(destName); + Queue dest = session.createQueue(destName); + // Queue jmsReply = session + // createQueue("direct://amq.direct//?routingkey='reply-" + // + brokerId + "'"); + if (ttl != -1) + { + msg.setJMSExpiration(ttl); + } + msg.setJMSReplyTo(reply); + prod.send(dest, msg); + } catch (Exception e) + { + throw new ConsoleException(e); + } + } + } + + protected Encoder setHeader(Encoder enc, char opcode, long sequence) + { + enc.writeUint8((short) 'A'); + enc.writeUint8((short) 'M'); + enc.writeUint8((short) '3'); + enc.writeUint8((short) opcode); + enc.writeUint32(sequence); + return enc; + } + + public void setSyncInFlight(boolean inFlight) + { + synchronized (lockObject) + { + syncInFlight = inFlight; + lockObject.notifyAll(); + } + } + + public void shutdown() + { + if (connected) + { + this.waitForStable(); + try + { + session.close(); + for (MessageConsumer cons : consumers) + { + cons.close(); + } + connection.close(); + } catch (Exception e) + { + throw new ConsoleException(e); + } finally + { + this.connected = false; + } + } + } + + protected void tryToConnect() + { + try + { + reqsOutstanding = 1; + Agent newAgent = new Agent(this, 0, "BrokerAgent"); + Agents.put(newAgent.agentKey(), newAgent); + connection = new AMQConnection(url); + session = connection.createSession(sessionTransacted, + acknowledgeMode); + replyName = String + .format( + "direct://amq.direct//reply-%s?exclusive='True'&autodelete='True'", + brokerId); + topicName = String + .format( + "management://qpid.management//topic-%s?exclusive='True'&autodelete='True'", + brokerId); + reply = session.createQueue(replyName); + MessageConsumer cons = session.createConsumer(reply); + cons.setMessageListener(this); + consumers.add(cons); + prod = session.createProducer(null); + topic = session.createQueue(topicName); + cons = session.createConsumer(topic); + cons.setMessageListener(this); + consumers.add(cons); + connection.start(); + // Rest of the topic is bound later. Start er up + } catch (Exception e) + { + throw new ConsoleException(e); + } + connected = true; + consoleSession.handleBrokerConnect(this); + Encoder Encoder = createEncoder('B', 0); + this.send(Encoder); + } + + public void updateAgent(QMFObject obj) + { + long agentBank = (Long) obj.getProperty("agentBank"); + long brokerBank = (Long) obj.getProperty("brokerBank"); + String key = Agent.AgentKey(agentBank, brokerBank); + if (obj.isDeleted()) + { + if (Agents.containsKey(key)) + { + Agent agent = Agents.get(key); + Agents.remove(key); + consoleSession.handleAgentRemoved(agent); + } + } else + { + if (!Agents.containsKey(key)) + { + Agent newAgent = new Agent(this, agentBank, (String) obj + .getProperty("label")); + Agents.put(key, newAgent); + consoleSession.handleNewAgent(newAgent); + } + } + } + + public void waitForStable() + { + synchronized (lockObject) + { + if (connected) + { + long start = System.currentTimeMillis(); + syncInFlight = true; + while (reqsOutstanding != 0) + { + log.debug("Waiting to recieve messages"); + try + { + lockObject.wait(SYNC_TIME); + } catch (Exception e) + { + throw new ConsoleException(e); + } + long duration = System.currentTimeMillis() - start; + if (duration > SYNC_TIME) + { + throw new ConsoleException( + "Timeout waiting for Broker to Sync"); + } + } + } + } + } + + public void waitForSync(int timeout) + { + synchronized (lockObject) + { + long start = System.currentTimeMillis(); + while (syncInFlight) + { + try + { + lockObject.wait(SYNC_TIME); + } catch (Exception e) + { + throw new ConsoleException(e); + } + } + long duration = System.currentTimeMillis() - start; + if (duration > timeout) + { + throw new ConsoleException("Timeout waiting for Broker to Sync"); + } + } + } } \ No newline at end of file Modified: qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/ClassKey.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/ClassKey.java?rev=788782&r1=788781&r2=788782&view=diff ============================================================================== --- qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/ClassKey.java (original) +++ qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/ClassKey.java Fri Jun 26 17:40:01 2009 @@ -22,9 +22,12 @@ import org.apache.qpid.transport.codec.Decoder; import org.apache.qpid.transport.codec.Encoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ClassKey { + private static Logger log = LoggerFactory.getLogger(ClassKey.class); private String packageName; private String className; private long[] hash = new long[4]; @@ -41,8 +44,8 @@ public ClassKey(String keyString) { - String delims = ":()"; - String[] parts = keyString.split(java.util.regex.Pattern.quote(delims)); + String delims = "[*:*(*)]"; + String[] parts = keyString.split(delims); if (parts.length < 3) { throw new ConsoleException( @@ -51,7 +54,7 @@ setPackageName(parts[0]); setClassName(parts[1]); delims = "-"; - String[] bytes = parts[2].split(java.util.regex.Pattern.quote(delims)); + String[] bytes = parts[2].split(delims); if (bytes.length != 4) { throw new ConsoleException( @@ -95,15 +98,16 @@ { return hash; } - - public String getHashString() { - return String.format("%08x-%08x-%08x-%08x", hash[0], hash[1], - hash[2], hash[3]); + + public String getHashString() + { + return String.format("%08x-%08x-%08x-%08x", hash[0], hash[1], hash[2], + hash[3]); } public String getKeyString() { - String hashString = this.getHashString() ; + String hashString = this.getHashString(); return String.format("%s:%s(%s)", getPackageName(), getClassName(), hashString); } Modified: qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/QMFObject.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/QMFObject.java?rev=788782&r1=788781&r2=788782&view=diff ============================================================================== --- qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/QMFObject.java (original) +++ qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/QMFObject.java Fri Jun 26 17:40:01 2009 @@ -233,7 +233,7 @@ return objectID; } - public final Object GetProperty(String attributeName) + public final Object getProperty(String attributeName) { return properties.get(attributeName); } Modified: qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/SchemaClass.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/SchemaClass.java?rev=788782&r1=788781&r2=788782&view=diff ============================================================================== --- qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/SchemaClass.java (original) +++ qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/SchemaClass.java Fri Jun 26 17:40:01 2009 @@ -198,7 +198,7 @@ { superType = value; } - + public ArrayList<SchemaProperty> getProperties() { return properties; Modified: qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/Session.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/Session.java?rev=788782&r1=788781&r2=788782&view=diff ============================================================================== --- qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/Session.java (original) +++ qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/Session.java Fri Jun 26 17:40:01 2009 @@ -149,6 +149,7 @@ { this, schema, dec, hasProperties, hasStats, isManaged }; try { + log.debug("" + realClass); Constructor ci = realClass.getConstructor(types); return (QMFObject) ci.newInstance(args); } catch (Exception e) @@ -277,7 +278,7 @@ enc.writeUint16(((Integer) val).intValue()); break; case 3: // U32 - enc.writeUint32(((Long) val).longValue()); + enc.writeUint32(((Integer) val).longValue()); break; case 4: // U64 enc.writeUint64(((Long) val).longValue()); Modified: qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/XMLUtil.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/XMLUtil.java?rev=788782&r1=788781&r2=788782&view=diff ============================================================================== --- qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/XMLUtil.java (original) +++ qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/XMLUtil.java Fri Jun 26 17:40:01 2009 @@ -1,103 +1,134 @@ package org.apache.qpid.console; - public class XMLUtil { - - public static String commonAttributes(SchemaVariable var) { - String returnString = "" ; - if (var.getDescription() != null){ - returnString = returnString + String.format(" desc='%s'", var.getDescription()) ; - } - - if (var.getRefPackage() != null){ - returnString = returnString + String.format(" refPackage='%s'", var.getRefPackage()) ; - } - - if (var.getRefClass() != null){ - returnString = returnString + String.format(" refClass='%s'", var.getRefClass()) ; - } - - if (var.getUnit() != null){ - returnString = returnString + String.format(" unit='%s'", var.getUnit()) ; - } - - if (var.getMin() != null){ - returnString = returnString + String.format(" min='%s'", var.getMin()) ; - } - if (var.getMax() != null){ - returnString = returnString + String.format(" max='%s'", var.getMax()) ; - } - if (var.getMaxLength() != null){ - returnString = returnString + String.format(" maxLength='%s'", var.getMaxLength()) ; - } - - return returnString ; - } + public static String commonAttributes(SchemaVariable var) + { + String returnString = ""; + if (var.getDescription() != null) + { + returnString = returnString + + String.format(" desc='%s'", var.getDescription()); + } + if (var.getRefPackage() != null) + { + returnString = returnString + + String.format(" refPackage='%s'", var.getRefPackage()); + } + if (var.getRefClass() != null) + { + returnString = returnString + + String.format(" refClass='%s'", var.getRefClass()); + } + if (var.getUnit() != null) + { + returnString = returnString + + String.format(" unit='%s'", var.getUnit()); + } + if (var.getMin() != null) + { + returnString = returnString + + String.format(" min='%s'", var.getMin()); + } + if (var.getMax() != null) + { + returnString = returnString + + String.format(" max='%s'", var.getMax()); + } + if (var.getMaxLength() != null) + { + returnString = returnString + + String.format(" maxLength='%s'", var.getMaxLength()); + } + return returnString; + } - public static String schemaXML(Session sess, String packageName) { - String returnValue = String.format("<schema package='%s'>\n", packageName) ; - for (ClassKey key : sess.getClasses(packageName)) { - SchemaClass schema = sess.getSchema(key) ; - if (schema.getKind() == 1) { - if (schema.getSuperType() == null) { - returnValue += String.format("\t<class name='%s' hash='%s'>\n", key.getClassName(), key.getHashString()) ; - } - else { - returnValue += String.format("\t<class name='%s' hash='%s' extends='%s'>\n", key.getClassName(), key.getHashString(), schema.getSuperType().getKeyString()) ; + public static String schemaXML(Session sess, String packageName) + { + String returnValue = String.format("<schema package='%s'>\n", + packageName); + for (ClassKey key : sess.getClasses(packageName)) + { + SchemaClass schema = sess.getSchema(key); + if (schema.getKind() == 1) + { + if (schema.getSuperType() == null) + { + returnValue += String.format( + "\t<class name='%s' hash='%s'>\n", key + .getClassName(), key.getHashString()); + } else + { + returnValue += String.format( + "\t<class name='%s' hash='%s' extends='%s'>\n", key + .getClassName(), key.getHashString(), + schema.getSuperType().getKeyString()); } - for (SchemaProperty prop : schema.getProperties()) { - Object[] attributes = new Object[5] ; - attributes[0] = prop.getName() ; - attributes[1] = Util.typeName(prop.getType()) ; - attributes[2] = Util.accessName(prop.getAccess()) ; - attributes[3] = prop.getOptional()? "True" : "False "; + for (SchemaProperty prop : schema.getProperties()) + { + Object[] attributes = new Object[5]; + attributes[0] = prop.getName(); + attributes[1] = Util.typeName(prop.getType()); + attributes[2] = Util.accessName(prop.getAccess()); + attributes[3] = prop.getOptional() ? "True" : "False "; attributes[4] = XMLUtil.commonAttributes(prop); - returnValue += String.format("\t\t<property name='%s' type='%s' access='%s' optional='%s'%s/>\n", attributes) ; + returnValue += String + .format( + "\t\t<property name='%s' type='%s' access='%s' optional='%s'%s/>\n", + attributes); } - for (SchemaMethod meth : schema.getMethods()) { - returnValue += String.format("\t\t<method name='%s'/>\n", meth.getName()) ; - for (SchemaArgument arg : meth.Arguments) { - Object[] attributes = new Object[4] ; - attributes[0] = arg.getName() ; - attributes[1] = arg.getDirection() ; - attributes[2] = Util.typeName(arg.getType()) ; - attributes[3] = XMLUtil.commonAttributes(arg); - returnValue += String.format("\t\t\t<arg name='%s' dir='%s' type='%s'%s/>\n", attributes) ; + for (SchemaMethod meth : schema.getMethods()) + { + returnValue += String.format("\t\t<method name='%s'/>\n", + meth.getName()); + for (SchemaArgument arg : meth.Arguments) + { + Object[] attributes = new Object[4]; + attributes[0] = arg.getName(); + attributes[1] = arg.getDirection(); + attributes[2] = Util.typeName(arg.getType()); + attributes[3] = XMLUtil.commonAttributes(arg); + returnValue += String + .format( + "\t\t\t<arg name='%s' dir='%s' type='%s'%s/>\n", + attributes); } - returnValue += String.format("\t\t</method>\n") ; + returnValue += String.format("\t\t</method>\n"); } - returnValue += String.format("\t</class>\n") ; - } else { - returnValue += String.format("\t<event name='%s' hash='%s'>\n", key.getClassName(), key.getHashString()) ; - for (SchemaArgument arg : schema.getArguments()) { - Object[] attributes = new Object[4] ; - attributes[0] = arg.getName() ; - attributes[1] = Util.typeName(arg.getType()) ; - attributes[2] = XMLUtil.commonAttributes(arg); - returnValue += String.format("\t\t\t<arg name='%s' type='%s'%s/>\n", attributes) ; + returnValue += String.format("\t</class>\n"); + } else + { + returnValue += String.format("\t<event name='%s' hash='%s'>\n", + key.getClassName(), key.getHashString()); + for (SchemaArgument arg : schema.getArguments()) + { + Object[] attributes = new Object[4]; + attributes[0] = arg.getName(); + attributes[1] = Util.typeName(arg.getType()); + attributes[2] = XMLUtil.commonAttributes(arg); + returnValue += String.format( + "\t\t\t<arg name='%s' type='%s'%s/>\n", attributes); } - returnValue += String.format("\t</event>\n") ; + returnValue += String.format("\t</event>\n"); } } - returnValue += String.format("</schema>\n") ; - - return returnValue ; - } - - public static String schemaXML(Session sess, String[] packageNames) { - String returnValue = "<schemas>\n" ; - for (String pack : packageNames) { - returnValue += XMLUtil.schemaXML(sess, pack) ; - returnValue += "\n" ; + returnValue += String.format("</schema>\n"); + return returnValue; + } + + public static String schemaXML(Session sess, String[] packageNames) + { + String returnValue = "<schemas>\n"; + for (String pack : packageNames) + { + returnValue += XMLUtil.schemaXML(sess, pack); + returnValue += "\n"; } - returnValue += "</schemas>\n" ; - return returnValue ; + returnValue += "</schemas>\n"; + return returnValue; } - + protected XMLUtil() { } } - - Added: qpid/trunk/qpid/java/management/console/src/test/java/org/apache/qpid/console/ClassKeyTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/console/src/test/java/org/apache/qpid/console/ClassKeyTest.java?rev=788782&view=auto ============================================================================== --- qpid/trunk/qpid/java/management/console/src/test/java/org/apache/qpid/console/ClassKeyTest.java (added) +++ qpid/trunk/qpid/java/management/console/src/test/java/org/apache/qpid/console/ClassKeyTest.java Fri Jun 26 17:40:01 2009 @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.console; + +import junit.framework.TestCase; + +public class ClassKeyTest extends TestCase +{ + public void testCreation() + { + ClassKey key = new ClassKey( + "some.package:Class(00000001-00000002-00000003-00000004)"); + assertEquals("some.package", key.getPackageName()); + assertEquals("Class", key.getClassName()); + assertEquals("00000001-00000002-00000003-00000004", key.getHashString()); + assertEquals(1, key.getHash()[0]); + assertEquals(2, key.getHash()[1]); + assertEquals(3, key.getHash()[2]); + assertEquals(4, key.getHash()[3]); + } +} --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org