Repository: qpid-jms Updated Branches: refs/heads/master 95941245a -> e3039f140
Add some initial support for gathing connection capabilities and properties into a config object. Start on real anonymous producer support. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/e3039f14 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/e3039f14 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/e3039f14 Branch: refs/heads/master Commit: e3039f140e10d5f427e25fb4d0de92133667468b Parents: 9594124 Author: Timothy Bish <tabish...@gmail.com> Authored: Mon Oct 13 18:50:11 2014 -0400 Committer: Timothy Bish <tabish...@gmail.com> Committed: Mon Oct 13 18:50:11 2014 -0400 ---------------------------------------------------------------------- .../qpid/jms/provider/amqp/AmqpConnection.java | 16 +++++ .../provider/amqp/AmqpConnectionProperties.java | 69 ++++++++++++++++++++ .../jms/provider/amqp/AmqpFixedProducer.java | 16 +++-- .../qpid/jms/provider/amqp/AmqpSession.java | 5 +- 4 files changed, 97 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e3039f14/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java index e53d5da..97118d7 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java @@ -49,6 +49,7 @@ public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Conn private boolean connected; private AmqpSaslAuthenticator authenticator; private final AmqpSession connectionSession; + private AmqpConnectionProperties properties; private String queuePrefix; private String topicPrefix; @@ -114,6 +115,10 @@ public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Conn if (!connected && isOpen()) { connected = true; + + this.properties = new AmqpConnectionProperties( + endpoint.getRemoteOfferedCapabilities(), endpoint.getRemoteProperties()); + connectionSession.open(new AsyncResult() { @Override @@ -329,6 +334,17 @@ public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Conn return this.amqpMessageFactory; } + /** + * Returns the connection properties for an established connection which defines the various + * capabilities and configuration options of the remote connection. Prior to the establishment + * of a connection this method returns null. + * + * @return the properties available for this connection or null if not connected. + */ + public AmqpConnectionProperties getProperties() { + return properties; + } + @Override public String toString() { return "AmqpConnection { " + getConnectionInfo().getConnectionId() + " }"; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e3039f14/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java new file mode 100644 index 0000000..add4d72 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp; + +import java.util.Map; + +import org.apache.qpid.proton.amqp.Symbol; + +/** + * Class used to examine the capabilities and connection properties of the + * remote connection and provide that information to the client code in a + * simpler and more easy to digest manner. + */ +public class AmqpConnectionProperties { + + private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("x-opt-anonymous-relay"); + + private String anonymousRelayName; + + /** + * Creates a new instance of this class from the given remote capabilities and properties. + * + * @param capabilities + * the capabilities offered by the remote connection. + * @param properties + * the properties offered by the remote connection. + */ + public AmqpConnectionProperties(Symbol[] capabilities, Map<Symbol, Object> properties) { + if (capabilities != null) { + processCapabilities(capabilities); + } + + if (properties != null) { + processProperties(properties); + } + } + + public boolean isAnonymousRelaySupported() { + return anonymousRelayName != null; + } + + public String getAnonymousRelayName() { + return anonymousRelayName; + } + + protected void processCapabilities(Symbol[] capabilities) { + // TODO - Inspect capabilities for configuration options + } + + protected void processProperties(Map<Symbol, Object> properties) { + if (properties.containsKey(ANONYMOUS_RELAY)) { + anonymousRelayName = (String) properties.get(ANONYMOUS_RELAY); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e3039f14/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java index 6a37da3..3afc492 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java @@ -233,16 +233,22 @@ public class AmqpFixedProducer extends AmqpProducer { @Override protected void doOpen() { - JmsDestination destination = info.getDestination(); + String targetAddress; + + if (info.getDestination() != null) { + JmsDestination destination = info.getDestination(); + targetAddress = session.getQualifiedName(destination); + } else { + targetAddress = connection.getProperties().getAnonymousRelayName(); + } - String destnationName = session.getQualifiedName(destination); String sourceAddress = getProducerId().toString(); Source source = new Source(); source.setAddress(sourceAddress); Target target = new Target(); - target.setAddress(destnationName); + target.setAddress(targetAddress); - String senderName = sourceAddress + ":" + destnationName; + String senderName = sourceAddress + ":" + targetAddress; endpoint = session.getProtonSession().sender(senderName); endpoint.setSource(source); endpoint.setTarget(target); @@ -268,7 +274,7 @@ public class AmqpFixedProducer extends AmqpProducer { @Override public boolean isAnonymous() { - return false; + return this.info.getDestination() == null; } @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e3039f14/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java index 44e864a..10519f3 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java @@ -101,10 +101,7 @@ public class AmqpSession extends AbstractAmqpResource<JmsSessionInfo, Session> { public AmqpProducer createProducer(JmsProducerInfo producerInfo) { AmqpProducer producer = null; - // TODO - There seems to be an issue with Proton not allowing links with a Target - // that has no address. Otherwise we could just ensure that messages sent - // to these anonymous targets have their 'to' value set to the destination. - if (producerInfo.getDestination() != null) { + if (producerInfo.getDestination() != null || connection.getProperties().isAnonymousRelaySupported()) { LOG.debug("Creating fixed Producer for: {}", producerInfo.getDestination()); producer = new AmqpFixedProducer(this, producerInfo); } else { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org