QPIDJMS-207 Adds support for Asynchronous JMS 2.0 sends. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/3a03663b Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/3a03663b Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/3a03663b
Branch: refs/heads/master Commit: 3a03663b79f98f80cd75f297cd9b70241ac68da3 Parents: 6553cfd Author: Timothy Bish <tabish...@gmail.com> Authored: Mon Sep 12 13:09:56 2016 -0400 Committer: Timothy Bish <tabish...@gmail.com> Committed: Mon Sep 12 13:09:56 2016 -0400 ---------------------------------------------------------------------- .../apache/qpid/jms/JmsCompletionListener.java | 47 ++ .../java/org/apache/qpid/jms/JmsConnection.java | 36 +- .../org/apache/qpid/jms/JmsMessageConsumer.java | 6 +- .../org/apache/qpid/jms/JmsMessageProducer.java | 100 +++- .../java/org/apache/qpid/jms/JmsSession.java | 349 +++++++++-- .../jms/message/JmsInboundMessageDispatch.java | 21 +- .../jms/message/JmsOutboundMessageDispatch.java | 42 +- .../jms/provider/DefaultProviderListener.java | 9 + .../qpid/jms/provider/ProviderListener.java | 22 + .../qpid/jms/provider/ProviderWrapper.java | 10 + .../jms/provider/amqp/AmqpAbstractResource.java | 5 +- .../amqp/AmqpAnonymousFallbackProducer.java | 17 +- .../qpid/jms/provider/amqp/AmqpConsumer.java | 55 +- .../qpid/jms/provider/amqp/AmqpEventSink.java | 6 +- .../jms/provider/amqp/AmqpExceptionBuilder.java | 34 ++ .../jms/provider/amqp/AmqpFixedProducer.java | 257 +++++--- .../qpid/jms/provider/amqp/AmqpProducer.java | 15 +- .../qpid/jms/provider/amqp/AmqpProvider.java | 39 +- .../provider/amqp/AmqpTransactionContext.java | 155 +++-- .../amqp/AmqpTransactionCoordinator.java | 4 +- .../amqp/builders/AmqpResourceBuilder.java | 13 +- .../jms/provider/failover/FailoverProvider.java | 18 + .../integration/ConsumerIntegrationTest.java | 109 ++++ .../PresettledProducerIntegrationTest.java | 231 ++++++++ .../integration/ProducerIntegrationTest.java | 592 +++++++++++++++++++ .../jms/integration/SessionIntegrationTest.java | 114 +++- .../jms/producer/JmsMessageProducerTest.java | 421 ++++++++++++- .../failover/FailoverIntegrationTest.java | 194 ++++++ .../qpid/jms/provider/mock/MockProvider.java | 12 +- .../mock/MockProviderConfiguration.java | 10 + .../qpid/jms/provider/mock/MockRemotePeer.java | 123 ++++ 31 files changed, 2793 insertions(+), 273 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsCompletionListener.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsCompletionListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsCompletionListener.java new file mode 100644 index 0000000..7a6c4d6 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsCompletionListener.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import javax.jms.Message; + +/** + * Interface used to implement listeners for asynchronous {@link javax.jms.Message} + * sends which will be notified on successful completion of a send or be notified of an + * error that was encountered while attempting to send a {@link javax.jms.Message}. + */ +public interface JmsCompletionListener { + + /** + * Called when an asynchronous send operation completes successfully. + * + * @param message + * the {@link javax.jms.Message} that was successfully sent. + */ + void onCompletion(Message message); + + /** + * Called when an asynchronous send operation fails to complete, the state + * of the send is unknown at this point. + * + * @param message + * the {@link javax.jms.Message} that was to be sent. + * @param exception + * the {@link java.lang.Exception} that describes the send error. + */ + void onException(Message message, Exception exception); + +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java index 827da11..a04d1b3 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java @@ -156,6 +156,11 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection public void close() throws JMSException { boolean interrupted = Thread.interrupted(); + for (JmsSession session : sessions.values()) { + session.checkIsDeliveryThread(); + session.checkIsCompletionThread(); + } + try { if (!closed.get() && !failed.get()) { @@ -1072,6 +1077,26 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection } @Override + public void onCompletedMessageSend(JmsOutboundMessageDispatch envelope) { + JmsSession session = sessions.get(envelope.getProducerId().getParentId()); + if (session != null) { + session.onCompletedMessageSend(envelope); + } else { + LOG.debug("No matching Session found for async send result"); + } + } + + @Override + public void onFailedMessageSend(JmsOutboundMessageDispatch envelope, Throwable cause) { + JmsSession session = sessions.get(envelope.getProducerId().getParentId()); + if (session != null) { + session.onFailedMessageSend(envelope, cause); + } else { + LOG.debug("No matching Session found for failed async send result"); + } + } + + @Override public void onConnectionInterrupted(final URI remoteURI) { for (JmsSession session : sessions.values()) { session.onConnectionInterrupted(); @@ -1161,6 +1186,12 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection public void onConnectionFailure(final IOException ex) { providerFailed(ex); + // Signal that connection dropped we need to mark transactions as + // failed, deliver failure events to asynchronous send completions etc. + for (JmsSession session : sessions.values()) { + session.onConnectionInterrupted(); + } + onProviderException(ex); for (AsyncResult request : requests.keySet()) { @@ -1304,10 +1335,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection if (!closed.get() && !closing.get()) { if (this.exceptionListener != null) { - if (!(error instanceof JMSException)) { - error = JmsExceptionSupport.create(error); - } - final JMSException jmsError = (JMSException)error; + final JMSException jmsError = JmsExceptionSupport.create(error); executor.execute(new Runnable() { @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java index de7ef63..18fd764 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java @@ -438,10 +438,10 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe } if (this.messageListener != null && this.started) { - session.getExecutor().execute(new MessageDeliverTask()); + session.getDispatcherExecutor().execute(new MessageDeliverTask()); } else { if (availableListener != null) { - session.getExecutor().execute(new Runnable() { + session.getDispatcherExecutor().execute(new Runnable() { @Override public void run() { if (session.isStarted()) { @@ -507,7 +507,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe void drainMessageQueueToListener() { if (this.messageListener != null && this.started) { - session.getExecutor().execute(new MessageDeliverTask()); + session.getDispatcherExecutor().execute(new MessageDeliverTask()); } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java index a1bbe38..65812b7 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java @@ -158,7 +158,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer { throw new UnsupportedOperationException("Using this method is not supported on producers created without an explicit Destination"); } - sendMessage(producerInfo.getDestination(), message, deliveryMode, priority, timeToLive); + sendMessage(producerInfo.getDestination(), message, deliveryMode, priority, timeToLive, null); } @Override @@ -174,15 +174,107 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer { throw new UnsupportedOperationException("Using this method is not supported on producers created with an explicit Destination."); } - sendMessage(destination, message, deliveryMode, priority, timeToLive); + sendMessage(destination, message, deliveryMode, priority, timeToLive, null); } - private void sendMessage(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { + /** + * Sends the message asynchronously and notifies the assigned listener on success or failure + * + * @param message + * the {@link javax.jms.Message} to send. + * @param listener + * the {@link JmsCompletionListener} to notify on send success or failure. + * + * @throws JMSException if an error occurs while attempting to send the Message. + */ + public void send(Message message, JmsCompletionListener listener) throws JMSException { + send(message, this.deliveryMode, this.priority, this.timeToLive, listener); + } + + /** + * Sends the message asynchronously and notifies the assigned listener on success or failure + * + * @param message + * the {@link javax.jms.Message} to send. + * @param deliveryMode + * the delivery mode to assign to the outbound Message. + * @param priority + * the priority to assign to the outbound Message. + * @param timeToLive + * the time to live value to assign to the outbound Message. + * @param listener + * the {@link JmsCompletionListener} to notify on send success or failure. + * + * @throws JMSException if an error occurs while attempting to send the Message. + */ + public void send(Message message, int deliveryMode, int priority, long timeToLive, JmsCompletionListener listener) throws JMSException { + checkClosed(); + + if (anonymousProducer) { + throw new UnsupportedOperationException("Using this method is not supported on producers created without an explicit Destination"); + } + + if (listener == null) { + throw new IllegalArgumentException("JmsCompletetionListener cannot be null"); + } + + sendMessage(producerInfo.getDestination(), message, deliveryMode, priority, timeToLive, listener); + } + + /** + * Sends the message asynchronously and notifies the assigned listener on success or failure + * + * @param destination + * the Destination to send the given Message to. + * @param message + * the {@link javax.jms.Message} to send. + * @param listener + * the {@link JmsCompletionListener} to notify on send success or failure. + * + * @throws JMSException if an error occurs while attempting to send the Message. + */ + public void send(Destination destination, Message message, JmsCompletionListener listener) throws JMSException { + send(destination, message, this.deliveryMode, this.priority, this.timeToLive, listener); + } + + /** + * Sends the message asynchronously and notifies the assigned listener on success or failure + * + * @param destination + * the Destination to send the given Message to. + * @param message + * the {@link javax.jms.Message} to send. + * @param deliveryMode + * the delivery mode to assign to the outbound Message. + * @param priority + * the priority to assign to the outbound Message. + * @param timeToLive + * the time to live value to assign to the outbound Message. + * @param listener + * the {@link JmsCompletionListener} to notify on send success or failure. + * + * @throws JMSException if an error occurs while attempting to send the Message. + */ + public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, JmsCompletionListener listener) throws JMSException { + checkClosed(); + + if (!anonymousProducer) { + throw new UnsupportedOperationException("Using this method is not supported on producers created with an explicit Destination."); + } + + if (listener == null) { + throw new IllegalArgumentException("JmsCompletetionListener cannot be null"); + } + + sendMessage(destination, message, deliveryMode, priority, timeToLive, listener); + } + + private void sendMessage(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, JmsCompletionListener listener) throws JMSException { if (destination == null) { throw new InvalidDestinationException("Don't understand null destinations"); } - this.session.send(this, destination, message, deliveryMode, priority, timeToLive, disableMessageId, disableTimestamp); + this.session.send(this, destination, message, deliveryMode, priority, timeToLive, disableMessageId, disableTimestamp, listener); } @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java index 4644267..817f342 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -18,8 +18,11 @@ package org.apache.qpid.jms; import java.io.Serializable; import java.util.ArrayList; +import java.util.Deque; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -58,6 +61,7 @@ import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; +import org.apache.qpid.jms.exceptions.JmsExceptionSupport; import org.apache.qpid.jms.message.JmsInboundMessageDispatch; import org.apache.qpid.jms.message.JmsMessage; import org.apache.qpid.jms.message.JmsMessageTransformation; @@ -98,14 +102,18 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe private final LinkedBlockingQueue<JmsInboundMessageDispatch> stoppedMessages = new LinkedBlockingQueue<JmsInboundMessageDispatch>(10000); private final JmsSessionInfo sessionInfo; - private volatile ExecutorService executor; private final ReentrantLock sendLock = new ReentrantLock(); + private volatile ExecutorService deliveryExecutor; + private volatile ExecutorService completionExcecutor; + private Thread deliveryThread; + private Thread completionThread; private final AtomicLong consumerIdGenerator = new AtomicLong(); private final AtomicLong producerIdGenerator = new AtomicLong(); private JmsTransactionContext transactionContext; private boolean sessionRecovered; private final AtomicReference<Exception> failureCause = new AtomicReference<Exception>(); + private final Deque<SendCompletion> asyncSendQueue = new ConcurrentLinkedDeque<SendCompletion>(); protected JmsSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException { this.connection = connection; @@ -178,6 +186,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe @Override public void commit() throws JMSException { checkClosed(); + checkIsCompletionThread(); if (!getTransacted()) { throw new javax.jms.IllegalStateException("Not a transacted session"); @@ -189,6 +198,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe @Override public void rollback() throws JMSException { checkClosed(); + checkIsCompletionThread(); if (!getTransacted()) { throw new javax.jms.IllegalStateException("Not a transacted session"); @@ -223,6 +233,9 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe @Override public void close() throws JMSException { + checkIsDeliveryThread(); + checkIsCompletionThread(); + if (!closed.get()) { doClose(); } @@ -272,11 +285,22 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe } transactionContext.shutdown(); + + synchronized (sessionInfo) { + if (completionExcecutor != null) { + completionExcecutor.shutdown(); + completionExcecutor = null; + } + } } } void sessionClosed(Exception cause) { try { + // TODO - This assumes we can't rely on the AmqpProvider to signal all pending + // asynchronous send completions that they are failed when the session + // is remotely closed. + getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause))); shutdown(cause); } catch (Throwable error) { LOG.trace("Ignoring exception thrown during cleanup of closed session", error); @@ -306,6 +330,11 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe try { if (producer != null) { + // TODO - This assumes we can't rely on the AmqpProvider to signal all pending + // asynchronous send completions that they are failed when the producer + // is remotely closed. + getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask( + producer.getProducerId(), JmsExceptionSupport.create(cause))); producer.shutdown(cause); } } catch (Throwable error) { @@ -624,17 +653,17 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe connection.onException(ex); } - protected void send(JmsMessageProducer producer, Destination dest, Message msg, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp) throws JMSException { + protected void send(JmsMessageProducer producer, Destination dest, Message msg, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp, JmsCompletionListener listener) throws JMSException { JmsDestination destination = JmsMessageTransformation.transformDestination(connection, dest); if (destination.isTemporary() && ((JmsTemporaryDestination) destination).isDeleted()) { throw new IllegalStateException("Temporary destination has been deleted"); } - send(producer, destination, msg, deliveryMode, priority, timeToLive, disableMsgId, disableTimestamp); + send(producer, destination, msg, deliveryMode, priority, timeToLive, disableMsgId, disableTimestamp, listener); } - private void send(JmsMessageProducer producer, JmsDestination destination, Message original, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp) throws JMSException { + private void send(JmsMessageProducer producer, JmsDestination destination, Message original, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp, JmsCompletionListener listener) throws JMSException { sendLock.lock(); try { original.setJMSDeliveryMode(deliveryMode); @@ -707,14 +736,35 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe envelope.setMessage(copy); envelope.setProducerId(producer.getProducerId()); envelope.setDestination(destination); - envelope.setSendAsync(!sync); + envelope.setSendAsync(listener == null ? !sync : true); envelope.setDispatchId(messageSequence); + envelope.setCompletionRequired(listener != null); if (producer.isAnonymous()) { envelope.setPresettle(getPresettlePolicy().isProducerPresttled(this, destination)); } - transactionContext.send(connection, envelope); + SendCompletion completion = null; + if (envelope.isCompletionRequired()) { + completion = new SendCompletion(envelope, listener); + asyncSendQueue.addLast(completion); + } + + try { + transactionContext.send(connection, envelope); + } catch (JMSException jmsEx) { + // If the synchronous portion of the send fails the completion be + // notified but might depending on the circumstances of the failures, + // remove it from the queue and check if is is already completed. + if (completion != null) { + asyncSendQueue.remove(completion); + if (completion.hasCompleted()) { + return; + } + } + + throw jmsEx; + } } finally { sendLock.unlock(); } @@ -837,9 +887,9 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe } synchronized (sessionInfo) { - if (executor != null) { - executor.shutdown(); - executor = null; + if (deliveryExecutor != null) { + deliveryExecutor.shutdown(); + deliveryExecutor = null; } } } @@ -852,29 +902,62 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe return connection; } - Executor getExecutor() { - ExecutorService exec = executor; - if(exec == null) { + Executor getDispatcherExecutor() { + ExecutorService exec = deliveryExecutor; + if (exec == null) { synchronized (sessionInfo) { - if (executor == null) { - executor = Executors.newSingleThreadExecutor(new ThreadFactory() { - @Override - public Thread newThread(Runnable runner) { - Thread executor = new Thread(runner); - executor.setName("JmsSession ["+ sessionInfo.getId() + "] dispatcher"); - executor.setDaemon(true); - return executor; - } - }); + if (deliveryExecutor == null) { + deliveryExecutor = createExecutor("delivery dispatcher"); } - exec = executor; + exec = deliveryExecutor; + exec.execute(new Runnable() { + + @Override + public void run() { + JmsSession.this.deliveryThread = Thread.currentThread(); + } + }); } } return exec; } + Executor getCompletionExecutor() { + ExecutorService exec = completionExcecutor; + if (exec == null) { + synchronized (sessionInfo) { + if (completionExcecutor == null) { + completionExcecutor = createExecutor("completion dispatcher"); + } + + exec = completionExcecutor; + exec.execute(new Runnable() { + + @Override + public void run() { + JmsSession.this.completionThread = Thread.currentThread(); + } + }); + } + } + + return exec; + } + + private ExecutorService createExecutor(final String threadNameSuffix) { + return Executors.newSingleThreadExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable runner) { + Thread executor = new Thread(runner); + executor.setName("JmsSession ["+ sessionInfo.getId() + "] " + threadNameSuffix); + executor.setDaemon(true); + return executor; + } + }); + } + protected JmsSessionInfo getSessionInfo() { return sessionInfo; } @@ -925,6 +1008,18 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe } } + void checkIsDeliveryThread() throws JMSException { + if (Thread.currentThread().equals(deliveryThread)) { + throw new IllegalStateException("Illegal invocation from MessageListener callback"); + } + } + + void checkIsCompletionThread() throws JMSException { + if (Thread.currentThread().equals(completionThread)) { + throw new IllegalStateException("Illegal invocation from CompletionListener callback"); + } + } + public JmsMessageIDPolicy getMessageIDPolicy() { return sessionInfo.getMessageIDPolicy(); } @@ -945,6 +1040,36 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe return sessionInfo.getDeserializationPolicy(); } + /** + * Sets the transaction context of the session. + * + * @param transactionContext + * provides the means to control a JMS transaction. + */ + public void setTransactionContext(JmsTransactionContext transactionContext) { + this.transactionContext = transactionContext; + } + + /** + * Returns the transaction context of the session. + * + * @return transactionContext + * session's transaction context. + */ + public JmsTransactionContext getTransactionContext() { + return transactionContext; + } + + boolean isSessionRecovered() { + return sessionRecovered; + } + + void clearSessionRecovered() { + sessionRecovered = false; + } + + //----- Event handlers ---------------------------------------------------// + @Override public void onInboundMessage(JmsInboundMessageDispatch envelope) { if (started.get()) { @@ -954,10 +1079,22 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe } } + protected void onCompletedMessageSend(final JmsOutboundMessageDispatch envelope) { + getCompletionExecutor().execute(new AsyncCompletionTask(envelope)); + } + + protected void onFailedMessageSend(final JmsOutboundMessageDispatch envelope, final Throwable cause) { + getCompletionExecutor().execute(new AsyncCompletionTask(envelope, cause)); + } + protected void onConnectionInterrupted() { transactionContext.onConnectionInterrupted(); + // TODO - Synthesize a better exception + JMSException failureCause = new JMSException("Send failed due to connection loss"); + getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(failureCause)); + for (JmsMessageProducer producer : producers.values()) { producer.onConnectionInterrupted(); } @@ -1019,31 +1156,155 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe } } - /** - * Sets the transaction context of the session. - * - * @param transactionContext - * provides the means to control a JMS transaction. - */ - public void setTransactionContext(JmsTransactionContext transactionContext) { - this.transactionContext = transactionContext; - } + //----- Asynchronous Send Helpers ----------------------------------------// - /** - * Returns the transaction context of the session. - * - * @return transactionContext - * session's transaction context. - */ - public JmsTransactionContext getTransactionContext() { - return transactionContext; + private final class FailOrCompleteAsyncCompletionsTask implements Runnable { + + private final JMSException failureCause; + private final JmsProducerId producerId; + + public FailOrCompleteAsyncCompletionsTask(JMSException failureCause) { + this(null, failureCause); + } + + public FailOrCompleteAsyncCompletionsTask(JmsProducerId producerId, JMSException failureCause) { + this.failureCause = failureCause; + this.producerId = producerId; + } + + @Override + public void run() { + // For any completion that is not yet marked as complete we fail it + // otherwise we send the already marked completion state event. + Iterator<SendCompletion> pending = asyncSendQueue.iterator(); + while (pending.hasNext()) { + SendCompletion completion = pending.next(); + + if (!completion.hasCompleted()) { + if (producerId == null || producerId.equals(completion.envelope.getProducerId())) { + completion.markAsFailed(failureCause); + } + } + + try { + completion.signalCompletion(); + } catch (Throwable error) { + LOG.trace("Signaled completion of send: {}", completion.envelope); + } + } + + asyncSendQueue.clear(); + } } - boolean isSessionRecovered() { - return sessionRecovered; + private final class AsyncCompletionTask implements Runnable { + + private final JmsOutboundMessageDispatch envelope; + private final Throwable cause; + + public AsyncCompletionTask(JmsOutboundMessageDispatch envelope) { + this(envelope, null); + } + + public AsyncCompletionTask(JmsOutboundMessageDispatch envelope, Throwable cause) { + this.envelope = envelope; + this.cause = cause; + } + + @Override + public void run() { + try { + SendCompletion completion = asyncSendQueue.peek(); + if (completion.getEnvelope().getDispatchId() == envelope.getDispatchId()) { + try { + completion = asyncSendQueue.remove(); + if (cause == null) { + completion.markAsComplete(); + } else { + completion.markAsFailed(JmsExceptionSupport.create(cause)); + } + completion.signalCompletion(); + } catch (Throwable error) { + LOG.trace("Failed while performing send completion: {}", envelope); + // TODO - What now? + } + + // Signal any trailing completions that have been marked complete + // before this one was that they have now that the one in front has + Iterator<SendCompletion> pending = asyncSendQueue.iterator(); + while (pending.hasNext()) { + completion = pending.next(); + if (completion.hasCompleted()) { + try { + completion.signalCompletion(); + } catch (Throwable error) { + LOG.trace("Failed while performing send completion: {}", envelope); + // TODO - What now? + } finally { + pending.remove(); + } + } else { + break; + } + } + } else { + // Not head so mark as complete and wait for the one in front to send + // the notification of completion. + Iterator<SendCompletion> pending = asyncSendQueue.iterator(); + while (pending.hasNext()) { + completion = pending.next(); + if (completion.getEnvelope().getDispatchId() == envelope.getDispatchId()) { + if (cause == null) { + completion.markAsComplete(); + } else { + completion.markAsFailed(JmsExceptionSupport.create(cause)); + } + } + } + } + } catch (Exception ex) { + LOG.debug("Send completion task encounted unexpected error: {}", ex.getMessage()); + // TODO - What now + } + } } - void clearSessionRecovered() { - sessionRecovered = false; + private final class SendCompletion { + + private final JmsOutboundMessageDispatch envelope; + private final JmsCompletionListener listener; + + private Exception failureCause; + private boolean completed; + + public SendCompletion(JmsOutboundMessageDispatch envelope, JmsCompletionListener listener) { + this.envelope = envelope; + this.listener = listener; + } + + public void markAsComplete() { + completed = true; + } + + public void markAsFailed(Exception cause) { + completed = true; + failureCause = cause; + } + + public boolean hasCompleted() { + return completed; + } + + public void signalCompletion() { + if (failureCause == null) { + listener.onCompletion(envelope.getMessage()); + } else { + listener.onException(envelope.getMessage(), failureCause); + } + } + + public JmsOutboundMessageDispatch getEnvelope() { + return envelope; + } } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java index 577ac17..c038519 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java @@ -30,6 +30,8 @@ public class JmsInboundMessageDispatch extends JmsAbstractResourceId { private JmsMessage message; private boolean enqueueFirst; + private transient String stringView; + public JmsInboundMessageDispatch(long sequence) { this.sequence = sequence; } @@ -74,10 +76,21 @@ public class JmsInboundMessageDispatch extends JmsAbstractResourceId { @Override public String toString() { - return "JmsInboundMessageDispatch {sequence = " + sequence - + ", messageId = " + messageId - + ", consumerId = " + consumerId - + "}"; + if (stringView == null) { + StringBuilder builder = new StringBuilder(); + + builder.append("JmsInboundMessageDispatch { sequence = "); + builder.append(sequence); + builder.append(", messageId = "); + builder.append(messageId); + builder.append(", consumerId = "); + builder.append(consumerId); + builder.append(" }"); + + stringView = builder.toString(); + } + + return stringView; } @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java index 05b9089..a34768f 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java @@ -29,8 +29,11 @@ public class JmsOutboundMessageDispatch { private JmsDestination destination; private boolean sendAsync; private boolean presettle; + private boolean completionRequired; private long dispatchId; + private transient String stringView; + public JmsDestination getDestination() { return destination; } @@ -39,6 +42,10 @@ public class JmsOutboundMessageDispatch { this.destination = destination; } + public Object getMessageId() { + return message.getFacade().getProviderMessageIdObject(); + } + public JmsMessage getMessage() { return message; } @@ -79,15 +86,34 @@ public class JmsOutboundMessageDispatch { this.presettle = presettle; } - @Override - public String toString() { - StringBuilder value = new StringBuilder(); + public boolean isCompletionRequired() { + return completionRequired; + } - value.append("JmsOutboundMessageDispatch {dispatchId = "); - value.append(getProducerId()); - value.append("-"); - value.append(getDispatchId()); + public void setCompletionRequired(boolean completionRequired) { + this.completionRequired = completionRequired; + } - return value.toString(); + @Override + public String toString() { + if (stringView == null) { + StringBuilder value = new StringBuilder(); + + value.append("JmsOutboundMessageDispatch {dispatchId = "); + value.append(getProducerId()); + value.append("-"); + value.append(getDispatchId()); + value.append(", MessageID = "); + try { + value.append(message.getJMSMessageID()); + } catch (Throwable e) { + value.append("<unknown>"); + } + value.append(" }"); + + stringView = value.toString(); + } + + return stringView; } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java index 22e204e..d2eb95c 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.URI; import org.apache.qpid.jms.message.JmsInboundMessageDispatch; +import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; import org.apache.qpid.jms.meta.JmsResource; /** @@ -32,6 +33,14 @@ public class DefaultProviderListener implements ProviderListener { } @Override + public void onCompletedMessageSend(JmsOutboundMessageDispatch envelope) { + } + + @Override + public void onFailedMessageSend(JmsOutboundMessageDispatch envelope, Throwable cause) { + } + + @Override public void onConnectionInterrupted(URI remoteURI) { } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java index 5c758ed..11a5f6b 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.URI; import org.apache.qpid.jms.message.JmsInboundMessageDispatch; +import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; import org.apache.qpid.jms.meta.JmsResource; /** @@ -36,6 +37,27 @@ public interface ProviderListener { void onInboundMessage(JmsInboundMessageDispatch envelope); /** + * Called when an outbound message dispatch that requested a completion callback + * has reached a state where the send can be considered successful based on the QoS + * level associated of the outbound message. + * + * @param envelope + * the original outbound message dispatch that is now complete. + */ + void onCompletedMessageSend(JmsOutboundMessageDispatch envelope); + + /** + * Called when an outbound message dispatch that requested a completion callback + * has reached a state where the send can be considered failed. + * + * @param envelope + * the original outbound message dispatch that should be treated as a failed send. + * @param cause + * the exception that describes the cause of the failed send. + */ + void onFailedMessageSend(JmsOutboundMessageDispatch envelope, Throwable cause); + + /** * Called from a fault tolerant Provider instance to signal that the underlying * connection to the Broker has been lost. The Provider will attempt to reconnect * following this event unless closed. http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java index 8574e04..3a3d383 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java @@ -154,6 +154,16 @@ public class ProviderWrapper<E extends Provider> implements Provider, ProviderLi } @Override + public void onCompletedMessageSend(JmsOutboundMessageDispatch envelope) { + listener.onCompletedMessageSend(envelope); + } + + @Override + public void onFailedMessageSend(JmsOutboundMessageDispatch envelope, Throwable cause) { + listener.onFailedMessageSend(envelope, cause); + } + + @Override public void onConnectionInterrupted(URI remoteURI) { listener.onConnectionInterrupted(remoteURI); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java index a8599ac..634cafd 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java @@ -23,6 +23,7 @@ import org.apache.qpid.jms.JmsOperationTimedOutException; import org.apache.qpid.jms.meta.JmsConnectionInfo; import org.apache.qpid.jms.meta.JmsResource; import org.apache.qpid.jms.provider.AsyncResult; +import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Endpoint; import org.apache.qpid.proton.engine.EndpointState; import org.slf4j.Logger; @@ -158,7 +159,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp closeOrDetachEndpoint(); } - // Process the close now, so that child close operations see the correct state. + // Process the close before moving on to closing down child resources provider.pumpToProtonTransport(); handleResourceClosure(provider, error); @@ -253,7 +254,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp } @Override - public void processDeliveryUpdates(AmqpProvider provider) throws IOException { + public void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws IOException { // Nothing do be done here, subclasses can override as needed. } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java index b44e3b3..3e07b25 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java @@ -69,7 +69,7 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer { } @Override - public boolean send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException { + public void send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException { LOG.trace("Started send chain for anonymous producer: {}", getProducerId()); // Force sends marked as asynchronous to be sent synchronous so that the temporary @@ -91,7 +91,8 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer { // We open a Fixed Producer instance with the target destination. Once it opens // it will trigger the open event which will in turn trigger the send event. - // If caching is disabled the created producer will be closed immediately. + // If caching is disabled the created producer will be closed immediately after + // the entire send chain has finished and the delivery has been acknowledged. AmqpProducerBuilder builder = new AmqpProducerBuilder(session, info); builder.buildResource(new AnonymousSendRequest(request, builder, envelope)); @@ -100,9 +101,9 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer { producerCache.put(envelope.getDestination(), builder.getResource()); } - return true; + getParent().getProvider().pumpToProtonTransport(request); } else { - return producer.send(envelope, request); + producer.send(envelope, request); } } @@ -135,6 +136,14 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer { return new JmsProducerId(producerIdKey, -1, producerIdCount++); } + @Override + public void addSendCompletionWatcher(AsyncResult watcher) { + throw new UnsupportedOperationException( + "The fallback producer parent should never have a watcher assigned."); + } + + //----- AsyncResult objects used to complete the sends -------------------// + private abstract class AnonymousRequest extends WrappedAsyncResult { protected final JmsOutboundMessageDispatch envelope; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index c15fd08..89586e3 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -385,42 +385,35 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver } @Override - public void processDeliveryUpdates(AmqpProvider provider) throws IOException { - Delivery incoming = null; - do { - incoming = getEndpoint().current(); - if (incoming != null) { - if (incoming.isReadable() && !incoming.isPartial()) { - LOG.trace("{} has incoming Message(s).", this); - try { - if (processDelivery(incoming)) { - // We processed a message, signal completion - // of a message pull request if there is one. - if (pullRequest != null) { - pullRequest.onSuccess(); - pullRequest = null; - } - } - } catch (Exception e) { - throw IOExceptionSupport.create(e); + public void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws IOException { + if (delivery.isReadable() && !delivery.isPartial()) { + LOG.trace("{} has incoming Message(s).", this); + try { + if (processDelivery(delivery)) { + // We processed a message, signal completion + // of a message pull request if there is one. + if (pullRequest != null) { + pullRequest.onSuccess(); + pullRequest = null; } - } else { - LOG.trace("{} has a partial incoming Message(s), deferring.", this); - incoming = null; } - } else { - // We have exhausted the locally queued messages on this link. - // Check if we tried to stop and have now run out of credit. - if (getEndpoint().getRemoteCredit() <= 0) { - if (stopRequest != null) { - stopRequest.onSuccess(); - stopRequest = null; - } + } catch (Exception e) { + throw IOExceptionSupport.create(e); + } + } + + if (getEndpoint().current() == null) { + // We have exhausted the locally queued messages on this link. + // Check if we tried to stop and have now run out of credit. + if (getEndpoint().getRemoteCredit() <= 0) { + if (stopRequest != null) { + stopRequest.onSuccess(); + stopRequest = null; } } - } while (incoming != null); + } - super.processDeliveryUpdates(provider); + super.processDeliveryUpdates(provider, delivery); } private boolean processDelivery(Delivery incoming) throws Exception { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpEventSink.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpEventSink.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpEventSink.java index b3e7501..2e25ad1 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpEventSink.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpEventSink.java @@ -18,6 +18,8 @@ package org.apache.qpid.jms.provider.amqp; import java.io.IOException; +import org.apache.qpid.proton.engine.Delivery; + /** * Interface used by classes that want to process AMQP events sent from * the transport layer. @@ -60,10 +62,12 @@ public interface AmqpEventSink { * * @param provider * the AmqpProvider instance for easier access to fire events. + * @param delivery + * the Delivery that has an update to its state which needs handled. * * @throws IOException if an error occurs while processing the update. */ - void processDeliveryUpdates(AmqpProvider provider) throws IOException; + void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws IOException; /** * Called when the Proton Engine signals an Flow related event has been triggered http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpExceptionBuilder.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpExceptionBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpExceptionBuilder.java new file mode 100644 index 0000000..2ecf245 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpExceptionBuilder.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.jms.provider.amqp; + +/** + * Used to provide a source for an exception based on some event such as + * operation timed out, etc. + */ +public interface AmqpExceptionBuilder { + + /** + * Creates an exception appropriate to some failure condition + * + * @return a new Exception instance that describes a failure condition. + */ + Exception createException(); + +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java index c354822..9233ce1 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java @@ -18,10 +18,10 @@ package org.apache.qpid.jms.provider.amqp; import java.io.IOException; import java.util.ArrayList; -import java.util.LinkedHashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; import java.util.concurrent.ScheduledFuture; import javax.jms.IllegalStateException; @@ -62,10 +62,12 @@ public class AmqpFixedProducer extends AmqpProducer { private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {}; private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator(true); - private final Set<Delivery> sent = new LinkedHashSet<Delivery>(); - private final LinkedList<InFlightSend> blocked = new LinkedList<InFlightSend>(); + private final Map<Object, InFlightSend> sent = new LinkedHashMap<Object, InFlightSend>(); + private final Map<Object, InFlightSend> blocked = new LinkedHashMap<Object, InFlightSend>(); private byte[] encodeBuffer = new byte[1024 * 8]; + private AsyncResult sendCompletionWatcher; + public AmqpFixedProducer(AmqpSession session, JmsProducerInfo info) { super(session, info); } @@ -86,29 +88,24 @@ public class AmqpFixedProducer extends AmqpProducer { } @Override - public boolean send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException { + public void send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException { if (isClosed()) { request.onFailure(new IllegalStateException("The MessageProducer is closed")); } if (getEndpoint().getCredit() <= 0) { LOG.trace("Holding Message send until credit is available."); - // Once a message goes into a held mode we no longer can send it async, so - // we clear the async flag if set to avoid the sender never getting notified. - envelope.setSendAsync(false); InFlightSend send = new InFlightSend(envelope, request); if (getSendTimeout() > JmsConnectionInfo.INFINITE) { - send.requestTimeout = getParent().getProvider().scheduleRequestTimeout( - send, getSendTimeout(), new JmsSendTimedOutException("Timed out waiting for credit to send Message", envelope.getMessage())); + send.requestTimeout = getParent().getProvider().scheduleRequestTimeout(send, getSendTimeout(), send); } - blocked.addLast(send); - return false; + blocked.put(envelope.getMessageId(), send); + getParent().getProvider().pumpToProtonTransport(request); } else { doSend(envelope, request); - return true; } } @@ -133,35 +130,54 @@ public class AmqpFixedProducer extends AmqpProducer { delivery = getEndpoint().delivery(tag, 0, tag.length); } - delivery.setContext(request); - if (session.isTransacted()) { - Binary amqpTxId = session.getTransactionContext().getAmqpTransactionId(); + AmqpTransactionContext context = session.getTransactionContext(); + Binary amqpTxId = context.getAmqpTransactionId(); TransactionalState state = new TransactionalState(); state.setTxnId(amqpTxId); delivery.disposition(state); + context.registerTxProducer(this); } AmqpJmsMessageFacade amqpMessageFacade = (AmqpJmsMessageFacade) facade; encodeAndSend(amqpMessageFacade.getAmqpMessage(), delivery); + AmqpProvider provider = getParent().getProvider(); + + InFlightSend send = null; + if (request instanceof InFlightSend) { + send = (InFlightSend) request; + } else { + send = new InFlightSend(envelope, request); + + if (!presettle && getSendTimeout() != JmsConnectionInfo.INFINITE) { + send.requestTimeout = getParent().getProvider().scheduleRequestTimeout(send, getSendTimeout(), send); + } + } + if (presettle) { delivery.settle(); } else { - sent.add(delivery); + sent.put(envelope.getMessageId(), send); getEndpoint().advance(); } - if (envelope.isSendAsync() || presettle) { - request.onSuccess(); - } else if (getSendTimeout() != JmsConnectionInfo.INFINITE) { - InFlightSend send = new InFlightSend(envelope, request); - - send.requestTimeout = getParent().getProvider().scheduleRequestTimeout( - send, getSendTimeout(), new JmsSendTimedOutException("Timed out waiting for disposition of sent Message", envelope.getMessage())); - - // Update context so the incoming disposition can cancel any pending timeout - delivery.setContext(send); + send.setDelivery(delivery); + delivery.setContext(send); + + // Put it on the wire and let it fail if the connection is broken, if it does + // get written then continue on to determine when we should complete it. + if (provider.pumpToProtonTransport(request)) { + // For presettled messages we can just mark as successful and we are done, but + // for any other message we still track it until the remote settles. If the send + // was tagged as asynchronous we must mark the original request as complete but + // we still need to wait for the disposition before we can consider the send as + // having been successful. + if (presettle) { + send.onSuccess(); + } else if (envelope.isSendAsync()) { + send.getOriginalRequest().onSuccess(); + } } } @@ -195,13 +211,16 @@ public class AmqpFixedProducer extends AmqpProducer { @Override public void processFlowUpdates(AmqpProvider provider) throws IOException { if (!blocked.isEmpty() && getEndpoint().getCredit() > 0) { - while (getEndpoint().getCredit() > 0 && !blocked.isEmpty()) { + Iterator<InFlightSend> blockedSends = blocked.values().iterator(); + while (getEndpoint().getCredit() > 0 && blockedSends.hasNext()) { LOG.trace("Dispatching previously held send"); - InFlightSend held = blocked.pop(); + InFlightSend held = blockedSends.next(); try { - doSend(held.envelope, held); + doSend(held.getEnvelope(), held); } catch (JMSException e) { throw IOExceptionSupport.create(e); + } finally { + blockedSends.remove(); } } } @@ -211,25 +230,18 @@ public class AmqpFixedProducer extends AmqpProducer { getEndpoint().drained(); } - // Once the pending sends queue is drained we can propagate the close request. - if (blocked.isEmpty() && isAwaitingClose() && !isClosed()) { - super.close(closeRequest); - } - super.processFlowUpdates(provider); } @Override - public void processDeliveryUpdates(AmqpProvider provider) throws IOException { - List<Delivery> toRemove = new ArrayList<Delivery>(); - - for (Delivery delivery : sent) { - DeliveryState state = delivery.getRemoteState(); - if (state == null) { - continue; - } + public void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws IOException { + DeliveryState state = delivery.getRemoteState(); + if (state != null) { + InFlightSend send = (InFlightSend) delivery.getContext(); + Exception deliveryError = null; Outcome outcome = null; + if (state instanceof TransactionalState) { LOG.trace("State of delivery is Transactional, retrieving outcome: {}", state); outcome = ((TransactionalState) state).getOutcome(); @@ -240,14 +252,9 @@ public class AmqpFixedProducer extends AmqpProducer { outcome = null; } - AsyncResult request = (AsyncResult) delivery.getContext(); - Exception deliveryError = null; - if (outcome instanceof Accepted) { LOG.trace("Outcome of delivery was accepted: {}", delivery); - if (request != null && !request.isComplete()) { - request.onSuccess(); - } + send.onSuccess(); } else if (outcome instanceof Rejected) { LOG.trace("Outcome of delivery was rejected: {}", delivery); ErrorCondition remoteError = ((Rejected) outcome).getError(); @@ -265,21 +272,11 @@ public class AmqpFixedProducer extends AmqpProducer { } if (deliveryError != null) { - if (request != null && !request.isComplete()) { - request.onFailure(deliveryError); - } else { - connection.getProvider().fireNonFatalProviderException(deliveryError); - } + send.onFailure(deliveryError); } - - tagGenerator.returnTag(delivery.getTag()); - toRemove.add(delivery); - delivery.settle(); } - sent.removeAll(toRemove); - - super.processDeliveryUpdates(provider); + super.processDeliveryUpdates(provider, delivery); } public AmqpSession getSession() { @@ -312,45 +309,45 @@ public class AmqpFixedProducer extends AmqpProducer { error = new JMSException("Producer closed remotely before message transfer result was notified"); } - for (Delivery delivery : sent) { + Collection<InFlightSend> inflightSends = new ArrayList<InFlightSend>(sent.values()); + for (InFlightSend send : inflightSends) { try { - AsyncResult request = (AsyncResult) delivery.getContext(); - - if (request != null && !request.isComplete()) { - request.onFailure(error); - } - - delivery.settle(); - tagGenerator.returnTag(delivery.getTag()); + send.onFailure(error); } catch (Exception e) { - LOG.debug("Caught exception when failing pending send during remote producer closure: {}", delivery, e); + LOG.debug("Caught exception when failing pending send during remote producer closure: {}", send, e); } } - sent.clear(); - - for (InFlightSend blockedSend : blocked) { + Collection<InFlightSend> blockedSends = new ArrayList<InFlightSend>(blocked.values()); + for (InFlightSend send : blockedSends) { try { - AsyncResult request = blockedSend.request; - if (request != null && !request.isComplete()) { - request.onFailure(error); - } + send.onFailure(error); } catch (Exception e) { - LOG.debug("Caught exception when failing blocked send during remote producer closure: {}", blockedSend, e); + LOG.debug("Caught exception when failing blocked send during remote producer closure: {}", send, e); } } + } - blocked.clear(); + @Override + public void addSendCompletionWatcher(AsyncResult watcher) { + // If none pending signal done already. + // TODO - If we don't include blocked sends then update this. + if (blocked.isEmpty() && sent.isEmpty()) { + watcher.onSuccess(); + } else { + this.sendCompletionWatcher = watcher; + } } //----- Class used to manage held sends ----------------------------------// - private class InFlightSend implements AsyncResult { + private class InFlightSend implements AsyncResult, AmqpExceptionBuilder { - public final JmsOutboundMessageDispatch envelope; - public final AsyncResult request; + private final JmsOutboundMessageDispatch envelope; + private final AsyncResult request; - public ScheduledFuture<?> requestTimeout; + private Delivery delivery; + private ScheduledFuture<?> requestTimeout; public InFlightSend(JmsOutboundMessageDispatch envelope, AsyncResult request) { this.envelope = envelope; @@ -359,31 +356,95 @@ public class AmqpFixedProducer extends AmqpProducer { @Override public void onFailure(Throwable cause) { - if (requestTimeout != null) { - requestTimeout.cancel(false); - requestTimeout = null; - } - - blocked.remove(this); + handleSendCompletion(false); - request.onFailure(cause); + if (request.isComplete()) { + // Asynchronous sends can still be awaiting a completion in which case we + // send to them otherwise send to the listener to be reported. + if (envelope.isCompletionRequired()) { + getParent().getProvider().getProviderListener().onFailedMessageSend(envelope, cause); + } else { + getParent().getProvider().fireNonFatalProviderException(IOExceptionSupport.create(cause)); + } + } else { + request.onFailure(cause); + } } @Override public void onSuccess() { - if (requestTimeout != null) { - requestTimeout.cancel(false); - requestTimeout = null; + handleSendCompletion(true); + + if (!request.isComplete()) { + request.onSuccess(); } - blocked.remove(this); + if (envelope.isCompletionRequired()) { + getParent().getProvider().getProviderListener().onCompletedMessageSend(envelope); + } + } - request.onSuccess(); + public void setRequestTimeout(ScheduledFuture<?> requestTimeout) { + if (this.requestTimeout != null) { + this.requestTimeout.cancel(false); + } + + this.requestTimeout = requestTimeout; + } + + public JmsOutboundMessageDispatch getEnvelope() { + return envelope; + } + + public AsyncResult getOriginalRequest() { + return request; + } + + public void setDelivery(Delivery delivery) { + this.delivery = delivery; + } + + public Delivery getDelivery() { + return delivery; } @Override public boolean isComplete() { return request.isComplete(); } + + private void handleSendCompletion(boolean successful) { + setRequestTimeout(null); + + if (getDelivery() != null) { + sent.remove(envelope.getMessageId()); + delivery.settle(); + tagGenerator.returnTag(delivery.getTag()); + } else { + blocked.remove(envelope.getMessageId()); + } + + // TODO - Should this take blocked sends into consideration. + // Signal the watcher that all pending sends have completed if one is registered + // and both the in-flight sends and blocked sends have completed. + if (sendCompletionWatcher != null && sent.isEmpty() && blocked.isEmpty()) { + sendCompletionWatcher.onSuccess(); + } + + // Once the pending sends queue is drained and all in-flight sends have been + // settled we can propagate the close request. + if (isAwaitingClose() && !isClosed() && blocked.isEmpty() && sent.isEmpty()) { + AmqpFixedProducer.super.close(closeRequest); + } + } + + @Override + public Exception createException() { + if (delivery == null) { + return new JmsSendTimedOutException("Timed out waiting for credit to send Message", envelope.getMessage()); + } else { + return new JmsSendTimedOutException("Timed out waiting for disposition of sent Message", envelope.getMessage()); + } + } } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java index 74fe457..3ace6d2 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java @@ -54,13 +54,10 @@ public abstract class AmqpProducer extends AmqpAbstractResource<JmsProducerInfo, * @param request * The AsyncRequest that will be notified on send success or failure. * - * @return true if the producer had credit to send or false if there was no available - * credit and the send needed to be deferred. - * * @throws IOException if an error occurs sending the message * @throws JMSException if an error occurs while preparing the message for send. */ - public abstract boolean send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException; + public abstract void send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException; /** * @return true if this is an anonymous producer or false if fixed to a given destination. @@ -92,4 +89,14 @@ public abstract class AmqpProducer extends AmqpAbstractResource<JmsProducerInfo, public void setPresettle(boolean presettle) { this.presettle = presettle; } + + /** + * Allows a completion request to be added to this producer that will be notified + * once all outstanding sends have completed. + * + * @param watcher + * The AsyncResult that will be signaled once this producer has no pending sends. + */ + public abstract void addSendCompletionWatcher(AsyncResult watcher); + } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index 3bfe099..ca8a0e1 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -59,6 +59,7 @@ import org.apache.qpid.jms.transports.TransportListener; import org.apache.qpid.jms.util.IOExceptionSupport; import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Event; import org.apache.qpid.proton.engine.Event.Type; @@ -477,11 +478,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP producer = session.getProducer(producerId); } - boolean couldSend = producer.send(envelope, request); - pumpToProtonTransport(request); - if (couldSend && envelope.isSendAsync()) { - request.onSuccess(); - } + producer.send(envelope, request); } catch (Throwable t) { request.onFailure(t); } @@ -816,7 +813,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP case DELIVERY: amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext(); if (amqpEventSink != null) { - amqpEventSink.processDeliveryUpdates(this); + amqpEventSink.processDeliveryUpdates(this, (Delivery) protonEvent.getContext()); } break; default: @@ -1175,6 +1172,36 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP return null; } + /** + * Allows a resource to request that its parent resource schedule a future + * cancellation of a request and return it a {@link Future} instance that + * can be used to cancel the scheduled automatic failure of the request. + * + * @param request + * The request that should be marked as failed based on configuration. + * @param timeout + * The time to wait before marking the request as failed. + * @param builder + * An AmqpExceptionBuilder to use when creating a timed out exception. + * + * @return a {@link ScheduledFuture} that can be stored by the caller. + */ + public ScheduledFuture<?> scheduleRequestTimeout(final AsyncResult request, long timeout, final AmqpExceptionBuilder builder) { + if (timeout != JmsConnectionInfo.INFINITE) { + return serializer.schedule(new Runnable() { + + @Override + public void run() { + request.onFailure(builder.createException()); + pumpToProtonTransport(); + } + + }, timeout, TimeUnit.MILLISECONDS); + } + + return null; + } + //----- Internal implementation ------------------------------------------// private void checkClosed() throws ProviderClosedException { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java index 577c128..2d08bc7 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java @@ -44,6 +44,7 @@ public class AmqpTransactionContext implements AmqpResourceParent { private final AmqpSession session; private final Set<AmqpConsumer> txConsumers = new LinkedHashSet<AmqpConsumer>(); + private final Set<AmqpProducer> txProducers = new LinkedHashSet<AmqpProducer>(); private JmsTransactionId current; private AmqpTransactionCoordinator coordinator; @@ -85,7 +86,6 @@ public class AmqpTransactionContext implements AmqpResourceParent { } }; - if (coordinator == null || coordinator.isClosed()) { AmqpTransactionCoordinatorBuilder builder = new AmqpTransactionCoordinatorBuilder(this, session.getResourceInfo()); @@ -115,7 +115,7 @@ public class AmqpTransactionContext implements AmqpResourceParent { } } - public void commit(JmsTransactionInfo transactionInfo, final AsyncResult request) throws Exception { + public void commit(final JmsTransactionInfo transactionInfo, final AsyncResult request) throws Exception { if (!transactionInfo.getId().equals(current)) { if (!transactionInfo.isInDoubt() && current == null) { throw new IllegalStateException("Commit called with no active Transaction."); @@ -128,29 +128,10 @@ public class AmqpTransactionContext implements AmqpResourceParent { preCommit(); - LOG.trace("TX Context[{}] committing current TX[[]]", this, current); - coordinator.discharge(current, new AsyncResult() { - - @Override - public void onSuccess() { - current = null; - postCommit(); - request.onSuccess(); - } - - @Override - public void onFailure(Throwable result) { - current = null; - postCommit(); - request.onFailure(result); - } - - @Override - public boolean isComplete() { - return current == null; - } + DischargeCompletion dischargeResult = new DischargeCompletion(request, true); - }, true); + LOG.trace("TX Context[{}] committing current TX[[]]", this, current); + coordinator.discharge(current, dischargeResult, true); } public void rollback(JmsTransactionInfo transactionInfo, final AsyncResult request) throws Exception { @@ -167,29 +148,17 @@ public class AmqpTransactionContext implements AmqpResourceParent { preRollback(); - LOG.trace("TX Context[{}] rolling back current TX[[]]", this, current); - coordinator.discharge(current, new AsyncResult() { - - @Override - public void onSuccess() { - current = null; - postRollback(); - request.onSuccess(); - } - - @Override - public void onFailure(Throwable result) { - current = null; - postRollback(); - request.onFailure(result); - } + DischargeCompletion dischargeResult = new DischargeCompletion(request, false); - @Override - public boolean isComplete() { - return current == null; + if (txProducers.isEmpty()) { + LOG.trace("TX Context[{}] rolling back current TX[[]]", this, current); + coordinator.discharge(current, dischargeResult, false); + } else { + SendCompletion producersSendCompletion = new SendCompletion(transactionInfo, dischargeResult, txProducers.size(), false); + for (AmqpProducer producer : txProducers) { + producer.addSendCompletionWatcher(producersSendCompletion); } - - }, false); + } } //----- Context utility methods ------------------------------------------// @@ -198,6 +167,10 @@ public class AmqpTransactionContext implements AmqpResourceParent { txConsumers.add(consumer); } + public void registerTxProducer(AmqpProducer producer) { + txProducers.add(producer); + } + public AmqpSession getSession() { return session; } @@ -243,6 +216,7 @@ public class AmqpTransactionContext implements AmqpResourceParent { } txConsumers.clear(); + txProducers.clear(); } private void postRollback() { @@ -251,6 +225,7 @@ public class AmqpTransactionContext implements AmqpResourceParent { } txConsumers.clear(); + txProducers.clear(); } //----- Resource Parent implementation -----------------------------------// @@ -273,4 +248,94 @@ public class AmqpTransactionContext implements AmqpResourceParent { public AmqpProvider getProvider() { return session.getProvider(); } + + //----- Completion for Commit or Rollback operation ----------------------// + + private class DischargeCompletion implements AsyncResult { + + private final AsyncResult request; + private final boolean commit; + + public DischargeCompletion(AsyncResult request, boolean commit) { + this.request = request; + this.commit = commit; + } + + @Override + public void onFailure(Throwable result) { + cleanup(); + request.onFailure(result); + } + + @Override + public void onSuccess() { + cleanup(); + request.onSuccess(); + } + + @Override + public boolean isComplete() { + return request.isComplete(); + } + + private void cleanup() { + current = null; + if (commit) { + postCommit(); + } else { + postRollback(); + } + } + } + + //----- Completion result for Producers ----------------------------------// + + @SuppressWarnings("unused") + private class SendCompletion implements AsyncResult { + + private int pendingCompletions; + + private final JmsTransactionInfo info; + private final DischargeCompletion request; + + private boolean commit; + + public SendCompletion(JmsTransactionInfo info, DischargeCompletion request, int pendingCompletions, boolean commit) { + this.info = info; + this.request = request; + this.pendingCompletions = pendingCompletions; + this.commit = commit; + } + + @Override + public void onFailure(Throwable result) { + if (--pendingCompletions == 0) { + try { + LOG.trace("TX Context[{}] rolling back current TX[[]]", this, current); + coordinator.discharge(current, request, false); + } catch (Throwable error) { + request.onFailure(error); + } + } else { + commit = false; + } + } + + @Override + public void onSuccess() { + if (--pendingCompletions == 0) { + try { + LOG.trace("TX Context[{}] {} current TX[[]]", this, commit ? "committing" : "rolling back" ,current); + coordinator.discharge(current, request, commit); + } catch (Throwable error) { + request.onFailure(error); + } + } + } + + @Override + public boolean isComplete() { + return request.isComplete(); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org