Repository: activemq Updated Branches: refs/heads/trunk 27833d025 -> 78cb1120b
https://issues.apache.org/jira/browse/AMQ-5391 Allow for an anonymous relay using a configurable node name when creating the new link, default is $relay. Message's that arrive without a to field set are rejected as this is required for a relay. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/78cb1120 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/78cb1120 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/78cb1120 Branch: refs/heads/trunk Commit: 78cb1120b7b93957c36e0abc12e1d22f0f0d7390 Parents: 27833d0 Author: Timothy Bish <tabish...@gmail.com> Authored: Tue Oct 14 17:32:23 2014 -0400 Committer: Timothy Bish <tabish...@gmail.com> Committed: Tue Oct 14 17:32:23 2014 -0400 ---------------------------------------------------------------------- .../transport/amqp/AmqpProtocolConverter.java | 57 ++++++++++++++++++-- .../activemq/transport/amqp/AmqpWireFormat.java | 9 ++++ 2 files changed, 62 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/78cb1120/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index fb275b8..472aeb9 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -26,6 +26,7 @@ import java.util.LinkedList; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import javax.jms.Destination; import javax.jms.InvalidClientIDException; import javax.jms.InvalidSelectorException; @@ -111,10 +112,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { private static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES; private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class); private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {}; + private static final int CHANNEL_MAX = 32767; private final AmqpTransport amqpTransport; private static final Symbol COPY = Symbol.getSymbol("copy"); private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector"); private static final Symbol NO_LOCAL = Symbol.valueOf("no-local"); + private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("x-opt-anonymous-relay"); private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED"); protected int prefetch; @@ -132,10 +135,33 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } this.protonTransport.bind(this.protonConnection); + + // NOTE: QPid JMS client has a bug where the channel max is stored as a + // short value in the Connection class which means that if we allow + // the default channel max of 65535 to be sent then no new sessions + // can be created because the value would be -1 when checked. + this.protonTransport.setChannelMax(CHANNEL_MAX); + this.protonConnection.collect(eventCollector); + this.protonConnection.setProperties(getConnectionProperties()); + updateTracer(); } + /** + * Load and return a <code>Map<Symbol, Object></code> that contains the connection + * properties which will allow the client to better communicate with this broker. + * + * @return the properties that are sent to new clients on connect. + */ + protected Map<Symbol, Object> getConnectionProperties() { + Map<Symbol, Object> properties = new HashMap<Symbol, Object>(); + + properties.put(ANONYMOUS_RELAY, amqpTransport.getWireFormat().getAnonymousNodeName()); + + return properties; + } + @Override public void updateTracer() { if (amqpTransport.isTrace()) { @@ -559,10 +585,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); private final ActiveMQDestination destination; private boolean closed; + private final boolean anonymous; - public ProducerContext(ProducerId producerId, ActiveMQDestination destination) { + public ProducerContext(ProducerId producerId, ActiveMQDestination destination, boolean anonymous) { this.producerId = producerId; this.destination = destination; + this.anonymous = anonymous; } @Override @@ -581,6 +609,17 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { if (destination != null) { message.setJMSDestination(destination); + } else if (isAnonymous()) { + Destination toDestination = message.getJMSDestination(); + if (toDestination == null || !(toDestination instanceof ActiveMQDestination)) { + Rejected rejected = new Rejected(); + ErrorCondition condition = new ErrorCondition(); + condition.setCondition(Symbol.valueOf("failed")); + condition.setDescription("Missing to field for message sent to an anonymous producer"); + rejected.setError(condition); + delivery.disposition(rejected); + return; + } } message.setProducerId(producerId); @@ -673,6 +712,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { sendToActiveMQ(new RemoveInfo(producerId), null); } } + + public boolean isAnonymous() { + return anonymous; + } } long nextTransactionId = 1; @@ -795,8 +838,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } else { Target target = (Target) remoteTarget; ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++); - ActiveMQDestination dest; - if (target.getDynamic()) { + ActiveMQDestination dest = null; + boolean anonymous = false; + + if (target.getAddress().equals(amqpTransport.getWireFormat().getAnonymousNodeName())) { + anonymous = true; + } else if (target.getDynamic()) { dest = createTempQueue(); Target actualTarget = new Target(); actualTarget.setAddress(dest.getQualifiedName()); @@ -806,10 +853,11 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { dest = createDestination(remoteTarget); } - ProducerContext producerContext = new ProducerContext(producerId, dest); + ProducerContext producerContext = new ProducerContext(producerId, dest, anonymous); receiver.setContext(producerContext); receiver.flow(flow); + ProducerInfo producerInfo = new ProducerInfo(producerId); producerInfo.setDestination(dest); sendToActiveMQ(producerInfo, new ResponseHandler() { @@ -1383,6 +1431,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { return condition; } + @Override public void setPrefetch(int prefetch) { this.prefetch = prefetch; } http://git-wip-us.apache.org/repos/asf/activemq/blob/78cb1120/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java index 779cb65..f6c2880 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java @@ -40,6 +40,7 @@ public class AmqpWireFormat implements WireFormat { private int version = 1; private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE; private int maxAmqpFrameSize = NO_AMQP_MAX_FRAME_SIZE; + private String anonymousNodeName = "$relay"; @Override public ByteSequence marshal(Object command) throws IOException { @@ -126,4 +127,12 @@ public class AmqpWireFormat implements WireFormat { public void setMaxAmqpFrameSize(int maxAmqpFrameSize) { this.maxAmqpFrameSize = maxAmqpFrameSize; } + + public String getAnonymousNodeName() { + return anonymousNodeName; + } + + public void setAnonymousNodeName(String anonymousNodeName) { + this.anonymousNodeName = anonymousNodeName; + } }