Repository: qpid-jms Updated Branches: refs/heads/master 6485baab5 -> d9803914a
Remove all Proton-JMS and use just Proton-J constructs. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/d9803914 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/d9803914 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/d9803914 Branch: refs/heads/master Commit: d9803914a0144c50101e84f41c8a0bcf525219bd Parents: 6485baa Author: Timothy Bish <tabish...@gmail.com> Authored: Mon Sep 29 17:57:45 2014 -0400 Committer: Timothy Bish <tabish...@gmail.com> Committed: Mon Sep 29 17:57:45 2014 -0400 ---------------------------------------------------------------------- pom.xml | 2 +- qpid-jms-client/pom.xml | 2 +- .../qpid/jms/provider/amqp/AmqpConsumer.java | 18 +-- .../jms/provider/amqp/AmqpFixedProducer.java | 53 +------ .../qpid/jms/provider/amqp/AmqpJMSVendor.java | 159 ------------------- 5 files changed, 12 insertions(+), 222 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d9803914/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 8d5a83e..3c7b754 100644 --- a/pom.xml +++ b/pom.xml @@ -115,7 +115,7 @@ </dependency> <dependency> <groupId>org.apache.qpid</groupId> - <artifactId>proton-jms</artifactId> + <artifactId>proton-j</artifactId> <version>${proton-version}</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d9803914/qpid-jms-client/pom.xml ---------------------------------------------------------------------- diff --git a/qpid-jms-client/pom.xml b/qpid-jms-client/pom.xml index 3fca330..c3ca09b 100644 --- a/qpid-jms-client/pom.xml +++ b/qpid-jms-client/pom.xml @@ -47,7 +47,7 @@ </dependency> <dependency> <groupId>org.apache.qpid</groupId> - <artifactId>proton-jms</artifactId> + <artifactId>proton-j</artifactId> </dependency> <dependency> <groupId>io.vertx</groupId> http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d9803914/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index 9c04a5a..41fef20 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -32,8 +32,8 @@ import org.apache.qpid.jms.meta.JmsConsumerId; import org.apache.qpid.jms.meta.JmsConsumerInfo; import org.apache.qpid.jms.provider.AsyncResult; import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; -import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageBuilder; import org.apache.qpid.jms.provider.ProviderListener; +import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageBuilder; import org.apache.qpid.jms.util.IOExceptionSupport; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.DescribedType; @@ -49,7 +49,6 @@ import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; -import org.apache.qpid.proton.jms.EncodedMessage; import org.apache.qpid.proton.message.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -305,13 +304,9 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver } private void processDelivery(Delivery incoming) throws Exception { - EncodedMessage encoded = readIncomingMessage(incoming); JmsMessage message = null; try { - Message protonMessage = Message.Factory.create(); - protonMessage.decode(encoded.getArray(), encoded.getArrayOffset(), encoded.getLength()); - - message = (JmsMessage) AmqpJmsMessageBuilder.createJmsMessage(this, protonMessage); + message = AmqpJmsMessageBuilder.createJmsMessage(this, decodeIncomingMessage(incoming)); } catch (Exception e) { LOG.warn("Error on transform: {}", e.getMessage()); // TODO - We could signal provider error but not sure we want to fail @@ -421,7 +416,8 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver } } - protected EncodedMessage readIncomingMessage(Delivery incoming) { + // TODO - Find more efficient ways to produce the Message instance. + protected Message decodeIncomingMessage(Delivery incoming) { byte[] buffer; int count; @@ -429,11 +425,13 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver streamBuffer.write(incomingBuffer, 0, count); } + // TODO - This will copy, replace with something better later. Pooled Netty Buffer ? buffer = streamBuffer.toByteArray(); try { - //TODO: get rid of EncodedMessage usage - return new EncodedMessage(incoming.getMessageFormat(), buffer, 0, buffer.length); + Message protonMessage = Message.Factory.create(); + protonMessage.decode(buffer, 0, buffer.length); + return protonMessage; } finally { streamBuffer.reset(); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d9803914/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java index 4271fc9..676d386 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java @@ -45,9 +45,6 @@ import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Sender; -import org.apache.qpid.proton.jms.AutoOutboundTransformer; -import org.apache.qpid.proton.jms.EncodedMessage; -import org.apache.qpid.proton.jms.OutboundTransformer; import org.apache.qpid.proton.message.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,9 +63,6 @@ public class AmqpFixedProducer extends AmqpProducer { private final Set<Delivery> pending = new LinkedHashSet<Delivery>(); private final LinkedList<PendingSend> pendingSends = new LinkedList<PendingSend>(); private byte[] encodeBuffer = new byte[1024 * 8]; - - private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer(AmqpJMSVendor.INSTANCE); - private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT"; private boolean presettle = false; public AmqpFixedProducer(AmqpSession session, JmsProducerInfo info) { @@ -131,12 +125,8 @@ public class AmqpFixedProducer extends AmqpProducer { JmsMessage message = envelope.getMessage(); message.setReadOnlyBody(true); - if (facade instanceof AmqpJmsMessageFacade) { - AmqpJmsMessageFacade amqpMessage = (AmqpJmsMessageFacade) facade; - encodeAndSend(amqpMessage.getAmqpMessage(), delivery); - } else { - encodeAndSendTransformed(envelope.getMessage(), delivery); - } + AmqpJmsMessageFacade amqpMessage = (AmqpJmsMessageFacade) facade; + encodeAndSend(amqpMessage.getAmqpMessage(), delivery); if (presettle) { delivery.settle(); @@ -177,45 +167,6 @@ public class AmqpFixedProducer extends AmqpProducer { } } - private void encodeAndSendTransformed(JmsMessage message, Delivery delivery) throws IOException, JMSException { - - byte[] sendBuffer = null; - int sendBufferSize = 0; - int sendBufferOffset = 0; - - EncodedMessage amqp = null; - - // Needed by the transformer process. - if (!message.getProperties().containsKey(MESSAGE_FORMAT_KEY)) { - message.setProperty(MESSAGE_FORMAT_KEY, 0); - } - - try { - amqp = outboundTransformer.transform(message); - } catch (Exception e) { - throw IOExceptionSupport.create(e); - } - - if (amqp != null && amqp.getLength() > 0) { - sendBuffer = amqp.getArray(); - sendBufferOffset = amqp.getArrayOffset(); - sendBufferSize = amqp.getLength(); - int sentSoFar = 0; - - while (true) { - int sent = endpoint.send(sendBuffer, sendBufferOffset + sentSoFar, sendBufferSize - sentSoFar); - if (sent > 0) { - sentSoFar += sent; - if ((sendBufferSize - sentSoFar) == 0) { - break; - } - } else { - LOG.warn("{} failed to send any data from current Message.", this); - } - } - } - } - @Override public void processFlowUpdates() throws IOException { if (!pendingSends.isEmpty() && endpoint.getCredit() > 0) { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d9803914/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJMSVendor.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJMSVendor.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJMSVendor.java deleted file mode 100644 index fbe2b21..0000000 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJMSVendor.java +++ /dev/null @@ -1,159 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.qpid.jms.provider.amqp; - -import javax.jms.BytesMessage; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.ObjectMessage; -import javax.jms.Queue; -import javax.jms.StreamMessage; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; -import javax.jms.TextMessage; -import javax.jms.Topic; - -import org.apache.qpid.jms.JmsDestination; -import org.apache.qpid.jms.JmsQueue; -import org.apache.qpid.jms.JmsTemporaryQueue; -import org.apache.qpid.jms.JmsTemporaryTopic; -import org.apache.qpid.jms.JmsTopic; -import org.apache.qpid.jms.message.JmsMessage; -import org.apache.qpid.jms.message.JmsMessageFactory; -import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFactory; -import org.apache.qpid.proton.jms.JMSVendor; - -/** - * Vendor instance used with Proton-J JMS Transformer bits. - * - * TODO - This can go once we have our own message wrappers all working. - */ -public class AmqpJMSVendor extends JMSVendor { - - public static final AmqpJMSVendor INSTANCE = new AmqpJMSVendor(); - - private final JmsMessageFactory factory = new AmqpJmsMessageFactory(); - - private AmqpJMSVendor() { - } - - @Override - public BytesMessage createBytesMessage() { - try { - return factory.createBytesMessage(); - } catch (JMSException e) { - throw new RuntimeException(e); - } - } - - @Override - public StreamMessage createStreamMessage() { - try { - return factory.createStreamMessage(); - } catch (JMSException e) { - throw new RuntimeException(e); - } - } - - @Override - public Message createMessage() { - try { - return factory.createMessage(); - } catch (JMSException e) { - throw new RuntimeException(e); - } - } - - @Override - public TextMessage createTextMessage() { - try { - return factory.createTextMessage(); - } catch (JMSException e) { - throw new RuntimeException(e); - } - } - - @Override - public ObjectMessage createObjectMessage() { - try { - return factory.createObjectMessage(); - } catch (JMSException e) { - throw new RuntimeException(e); - } - } - - @Override - public MapMessage createMapMessage() { - try { - return factory.createMapMessage(); - } catch (JMSException e) { - throw new RuntimeException(e); - } - } - - @Override - public Destination createDestination(String name) { - return super.createDestination(name, Destination.class); - } - - @Override - public <T extends Destination> T createDestination(String name, Class<T> kind) { - if (kind == Queue.class) { - return kind.cast(new JmsQueue(name)); - } - if (kind == Topic.class) { - return kind.cast(new JmsTopic(name)); - } - if (kind == TemporaryQueue.class) { - return kind.cast(new JmsTemporaryQueue(name)); - } - if (kind == TemporaryTopic.class) { - return kind.cast(new JmsTemporaryTopic(name)); - } - - return kind.cast(new JmsQueue(name)); - } - - @Override - public void setJMSXUserID(Message msg, String value) { - ((JmsMessage) msg).getFacade().setUserId(value); - } - - @Override - public void setJMSXGroupID(Message msg, String value) { - ((JmsMessage) msg).getFacade().setGroupId(value); - } - - @Override - public void setJMSXGroupSequence(Message msg, int value) { - ((JmsMessage) msg).getFacade().setGroupSequence(value); - } - - @Override - public void setJMSXDeliveryCount(Message msg, long value) { - // Delivery count tracks total deliveries which is always one higher than - // re-delivery count since first delivery counts to. - ((JmsMessage) msg).getFacade().setRedeliveryCounter((int) (value == 0 ? value : value - 1)); - } - - @Override - public String toAddress(Destination dest) { - return ((JmsDestination) dest).getName(); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org