Pass the provider to all the state update methods to make it simpler to fire events from them, also ensure all overrides call the base.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/5f4acf8e Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/5f4acf8e Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/5f4acf8e Branch: refs/heads/master Commit: 5f4acf8e93c671f6fc31314d2308de6b696636f1 Parents: bcae5e0 Author: Timothy Bish <tabish...@gmail.com> Authored: Mon Feb 23 16:52:12 2015 -0500 Committer: Timothy Bish <tabish...@gmail.com> Committed: Mon Feb 23 16:52:12 2015 -0500 ---------------------------------------------------------------------- .../qpid/jms/provider/amqp/AmqpAbstractResource.java | 6 +++--- .../qpid/jms/provider/amqp/AmqpConnection.java | 4 ++-- .../apache/qpid/jms/provider/amqp/AmqpConsumer.java | 8 +++++--- .../qpid/jms/provider/amqp/AmqpFixedProducer.java | 8 ++++++-- .../apache/qpid/jms/provider/amqp/AmqpProvider.java | 14 +++++++------- .../qpid/jms/provider/amqp/AmqpQueueBrowser.java | 8 ++++---- .../apache/qpid/jms/provider/amqp/AmqpResource.java | 15 ++++++++++++--- .../jms/provider/amqp/AmqpTemporaryDestination.java | 2 +- .../jms/provider/amqp/AmqpTransactionContext.java | 4 +++- 9 files changed, 43 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5f4acf8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java index 075986f..6612786 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java @@ -246,7 +246,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp } @Override - public void processStateChange() throws IOException { + public void processStateChange(AmqpProvider provider) throws IOException { EndpointState remoteState = getEndpoint().getRemoteState(); if (remoteState == EndpointState.ACTIVE) { @@ -276,11 +276,11 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp } @Override - public void processDeliveryUpdates() throws IOException { + public void processDeliveryUpdates(AmqpProvider provider) throws IOException { } @Override - public void processFlowUpdates() throws IOException { + public void processFlowUpdates(AmqpProvider provider) throws IOException { } /** http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5f4acf8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java index 38d40c6..a17f1d4 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java @@ -119,7 +119,7 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn * side of the Connection. */ @Override - public void processStateChange() throws IOException { + public void processStateChange(AmqpProvider provider) throws IOException { if (!connected && isOpen()) { connected = true; @@ -147,7 +147,7 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn }); } - super.processStateChange(); + super.processStateChange(provider); } public void processSaslAuthentication() { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5f4acf8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index a519e42..1f4fbd1 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -129,7 +129,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver } @Override - public void processFlowUpdates() throws IOException { + public void processFlowUpdates(AmqpProvider provider) throws IOException { // Check if we tried to stop and have now run out of credit, and // processed all locally queued messages if (stopRequest != null) { @@ -140,7 +140,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver } } - super.processFlowUpdates(); + super.processFlowUpdates(provider); } @Override @@ -365,7 +365,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver } @Override - public void processDeliveryUpdates() throws IOException { + public void processDeliveryUpdates(AmqpProvider provider) throws IOException { Delivery incoming = null; do { incoming = getEndpoint().current(); @@ -394,6 +394,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver } } } while (incoming != null); + + super.processDeliveryUpdates(provider); } private void processDelivery(Delivery incoming) throws Exception { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5f4acf8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java index aefc222..3fd6035 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java @@ -169,7 +169,7 @@ public class AmqpFixedProducer extends AmqpProducer { } @Override - public void processFlowUpdates() throws IOException { + public void processFlowUpdates(AmqpProvider provider) throws IOException { if (!pendingSends.isEmpty() && getEndpoint().getCredit() > 0) { while (getEndpoint().getCredit() > 0 && !pendingSends.isEmpty()) { LOG.trace("Dispatching previously held send"); @@ -186,10 +186,12 @@ public class AmqpFixedProducer extends AmqpProducer { if (pendingSends.isEmpty() && isAwaitingClose()) { super.close(closeRequest); } + + super.processFlowUpdates(provider); } @Override - public void processDeliveryUpdates() { + public void processDeliveryUpdates(AmqpProvider provider) throws IOException { List<Delivery> toRemove = new ArrayList<Delivery>(); for (Delivery delivery : pending) { @@ -234,6 +236,8 @@ public class AmqpFixedProducer extends AmqpProducer { } pending.removeAll(toRemove); + + super.processDeliveryUpdates(provider); } @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5f4acf8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index d826a4e..3c4cae6 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -718,34 +718,34 @@ public class AmqpProvider implements Provider, TransportListener { case CONNECTION_REMOTE_CLOSE: case CONNECTION_REMOTE_OPEN: AmqpConnection connection = (AmqpConnection) protonEvent.getConnection().getContext(); - connection.processStateChange(); + connection.processStateChange(this); break; case SESSION_REMOTE_CLOSE: case SESSION_REMOTE_OPEN: AmqpSession session = (AmqpSession) protonEvent.getSession().getContext(); - session.processStateChange(); + session.processStateChange(this); break; case LINK_REMOTE_CLOSE: LOG.info("Link closed: {}", protonEvent.getLink().getContext()); AmqpResource cloedResource = (AmqpResource) protonEvent.getLink().getContext(); - cloedResource.processStateChange(); + cloedResource.processStateChange(this); break; case LINK_REMOTE_DETACH: LOG.info("Link detach: {}", protonEvent.getLink().getContext()); AmqpResource detachedResource = (AmqpResource) protonEvent.getLink().getContext(); - detachedResource.processStateChange(); + detachedResource.processStateChange(this); break; case LINK_REMOTE_OPEN: AmqpResource resource = (AmqpResource) protonEvent.getLink().getContext(); - resource.processStateChange(); + resource.processStateChange(this); break; case LINK_FLOW: amqpResource = (AmqpResource) protonEvent.getLink().getContext(); - amqpResource.processFlowUpdates(); + amqpResource.processFlowUpdates(this); break; case DELIVERY: amqpResource = (AmqpResource) protonEvent.getLink().getContext(); - amqpResource.processDeliveryUpdates(); + amqpResource.processDeliveryUpdates(this); break; default: break; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5f4acf8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java index df52c7e..245f938 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java @@ -63,7 +63,7 @@ public class AmqpQueueBrowser extends AmqpConsumer { } @Override - public void processFlowUpdates() throws IOException { + public void processFlowUpdates(AmqpProvider provider) throws IOException { if (getEndpoint().getDrain() && getEndpoint().getCredit() == getEndpoint().getRemoteCredit()) { JmsInboundMessageDispatch browseDone = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber()); browseDone.setConsumerId(getConsumerId()); @@ -76,17 +76,17 @@ public class AmqpQueueBrowser extends AmqpConsumer { getEndpoint().setDrain(false); } - super.processFlowUpdates(); + super.processFlowUpdates(provider); } @Override - public void processDeliveryUpdates() throws IOException { + public void processDeliveryUpdates(AmqpProvider provider) throws IOException { if (getEndpoint().getDrain() && getEndpoint().current() != null) { LOG.trace("{} incoming delivery, cancel drain.", getConsumerId()); getEndpoint().setDrain(false); } - super.processDeliveryUpdates(); + super.processDeliveryUpdates(provider); if (getEndpoint().getDrain() && getEndpoint().getCredit() == getEndpoint().getRemoteCredit()) { JmsInboundMessageDispatch browseDone = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber()); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5f4acf8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java index c4d33fe..f577b21 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java @@ -104,25 +104,34 @@ public interface AmqpResource { * Called when the Proton Engine signals that the state of the given resource has * changed on the remote side. * + * @param provider + * the AmqpProvider instance for easier access to fire events. + * * @throws IOException if an error occurs while processing the update. */ - void processStateChange() throws IOException; + void processStateChange(AmqpProvider provider) throws IOException; /** * Called when the Proton Engine signals an Delivery related event has been triggered * for the given endpoint. * + * @param provider + * the AmqpProvider instance for easier access to fire events. + * * @throws IOException if an error occurs while processing the update. */ - void processDeliveryUpdates() throws IOException; + void processDeliveryUpdates(AmqpProvider provider) throws IOException; /** * Called when the Proton Engine signals an Flow related event has been triggered * for the given endpoint. * + * @param provider + * the AmqpProvider instance for easier access to fire events. + * * @throws IOException if an error occurs while processing the update. */ - void processFlowUpdates() throws IOException; + void processFlowUpdates(AmqpProvider provider) throws IOException; /** * @returns true if the remote end has sent an error http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5f4acf8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java index e41b038..ea105e3 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java @@ -65,7 +65,7 @@ public class AmqpTemporaryDestination extends AmqpAbstractResource<JmsTemporaryD } @Override - public void processStateChange() { + public void processStateChange(AmqpProvider provider) { // TODO - We might want to check on our producer to see if it becomes closed // which might indicate that the broker purged the temporary destination. http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5f4acf8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java index 3326bd4..f145f0a 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java @@ -79,7 +79,7 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo, } @Override - public void processDeliveryUpdates() throws IOException { + public void processDeliveryUpdates(AmqpProvider provider) throws IOException { try { if (pendingDelivery != null && pendingDelivery.remotelySettled()) { DeliveryState state = pendingDelivery.getRemoteState(); @@ -121,6 +121,8 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo, request.onSuccess(); } } + + super.processDeliveryUpdates(provider); } catch (Exception e) { throw IOExceptionSupport.create(e); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org