Pass the provider to all the state update methods to make it simpler to
fire events from them, also ensure all overrides call the base. 

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/5f4acf8e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/5f4acf8e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/5f4acf8e

Branch: refs/heads/master
Commit: 5f4acf8e93c671f6fc31314d2308de6b696636f1
Parents: bcae5e0
Author: Timothy Bish <tabish...@gmail.com>
Authored: Mon Feb 23 16:52:12 2015 -0500
Committer: Timothy Bish <tabish...@gmail.com>
Committed: Mon Feb 23 16:52:12 2015 -0500

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpAbstractResource.java |  6 +++---
 .../qpid/jms/provider/amqp/AmqpConnection.java       |  4 ++--
 .../apache/qpid/jms/provider/amqp/AmqpConsumer.java  |  8 +++++---
 .../qpid/jms/provider/amqp/AmqpFixedProducer.java    |  8 ++++++--
 .../apache/qpid/jms/provider/amqp/AmqpProvider.java  | 14 +++++++-------
 .../qpid/jms/provider/amqp/AmqpQueueBrowser.java     |  8 ++++----
 .../apache/qpid/jms/provider/amqp/AmqpResource.java  | 15 ++++++++++++---
 .../jms/provider/amqp/AmqpTemporaryDestination.java  |  2 +-
 .../jms/provider/amqp/AmqpTransactionContext.java    |  4 +++-
 9 files changed, 43 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5f4acf8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
index 075986f..6612786 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
@@ -246,7 +246,7 @@ public abstract class AmqpAbstractResource<R extends 
JmsResource, E extends Endp
     }
 
     @Override
-    public void processStateChange() throws IOException {
+    public void processStateChange(AmqpProvider provider) throws IOException {
         EndpointState remoteState = getEndpoint().getRemoteState();
 
         if (remoteState == EndpointState.ACTIVE) {
@@ -276,11 +276,11 @@ public abstract class AmqpAbstractResource<R extends 
JmsResource, E extends Endp
     }
 
     @Override
-    public void processDeliveryUpdates() throws IOException {
+    public void processDeliveryUpdates(AmqpProvider provider) throws 
IOException {
     }
 
     @Override
-    public void processFlowUpdates() throws IOException {
+    public void processFlowUpdates(AmqpProvider provider) throws IOException {
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5f4acf8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index 38d40c6..a17f1d4 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -119,7 +119,7 @@ public class AmqpConnection extends 
AmqpAbstractResource<JmsConnectionInfo, Conn
      * side of the Connection.
      */
     @Override
-    public void processStateChange() throws IOException {
+    public void processStateChange(AmqpProvider provider) throws IOException {
 
         if (!connected && isOpen()) {
             connected = true;
@@ -147,7 +147,7 @@ public class AmqpConnection extends 
AmqpAbstractResource<JmsConnectionInfo, Conn
             });
         }
 
-        super.processStateChange();
+        super.processStateChange(provider);
     }
 
     public void processSaslAuthentication() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5f4acf8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index a519e42..1f4fbd1 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -129,7 +129,7 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
     }
 
     @Override
-    public void processFlowUpdates() throws IOException {
+    public void processFlowUpdates(AmqpProvider provider) throws IOException {
         // Check if we tried to stop and have now run out of credit, and
         // processed all locally queued messages
         if (stopRequest != null) {
@@ -140,7 +140,7 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
             }
         }
 
-        super.processFlowUpdates();
+        super.processFlowUpdates(provider);
     }
 
     @Override
@@ -365,7 +365,7 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
     }
 
     @Override
-    public void processDeliveryUpdates() throws IOException {
+    public void processDeliveryUpdates(AmqpProvider provider) throws 
IOException {
         Delivery incoming = null;
         do {
             incoming = getEndpoint().current();
@@ -394,6 +394,8 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
                 }
             }
         } while (incoming != null);
+
+        super.processDeliveryUpdates(provider);
     }
 
     private void processDelivery(Delivery incoming) throws Exception {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5f4acf8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index aefc222..3fd6035 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -169,7 +169,7 @@ public class AmqpFixedProducer extends AmqpProducer {
     }
 
     @Override
-    public void processFlowUpdates() throws IOException {
+    public void processFlowUpdates(AmqpProvider provider) throws IOException {
         if (!pendingSends.isEmpty() && getEndpoint().getCredit() > 0) {
             while (getEndpoint().getCredit() > 0 && !pendingSends.isEmpty()) {
                 LOG.trace("Dispatching previously held send");
@@ -186,10 +186,12 @@ public class AmqpFixedProducer extends AmqpProducer {
         if (pendingSends.isEmpty() && isAwaitingClose()) {
             super.close(closeRequest);
         }
+
+        super.processFlowUpdates(provider);
     }
 
     @Override
-    public void processDeliveryUpdates() {
+    public void processDeliveryUpdates(AmqpProvider provider) throws 
IOException {
         List<Delivery> toRemove = new ArrayList<Delivery>();
 
         for (Delivery delivery : pending) {
@@ -234,6 +236,8 @@ public class AmqpFixedProducer extends AmqpProducer {
         }
 
         pending.removeAll(toRemove);
+
+        super.processDeliveryUpdates(provider);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5f4acf8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index d826a4e..3c4cae6 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -718,34 +718,34 @@ public class AmqpProvider implements Provider, 
TransportListener {
                     case CONNECTION_REMOTE_CLOSE:
                     case CONNECTION_REMOTE_OPEN:
                         AmqpConnection connection = (AmqpConnection) 
protonEvent.getConnection().getContext();
-                        connection.processStateChange();
+                        connection.processStateChange(this);
                         break;
                     case SESSION_REMOTE_CLOSE:
                     case SESSION_REMOTE_OPEN:
                         AmqpSession session = (AmqpSession) 
protonEvent.getSession().getContext();
-                        session.processStateChange();
+                        session.processStateChange(this);
                         break;
                     case LINK_REMOTE_CLOSE:
                         LOG.info("Link closed: {}", 
protonEvent.getLink().getContext());
                         AmqpResource cloedResource = (AmqpResource) 
protonEvent.getLink().getContext();
-                        cloedResource.processStateChange();
+                        cloedResource.processStateChange(this);
                         break;
                     case LINK_REMOTE_DETACH:
                         LOG.info("Link detach: {}", 
protonEvent.getLink().getContext());
                         AmqpResource detachedResource = (AmqpResource) 
protonEvent.getLink().getContext();
-                        detachedResource.processStateChange();
+                        detachedResource.processStateChange(this);
                         break;
                     case LINK_REMOTE_OPEN:
                         AmqpResource resource = (AmqpResource) 
protonEvent.getLink().getContext();
-                        resource.processStateChange();
+                        resource.processStateChange(this);
                         break;
                     case LINK_FLOW:
                         amqpResource = (AmqpResource) 
protonEvent.getLink().getContext();
-                        amqpResource.processFlowUpdates();
+                        amqpResource.processFlowUpdates(this);
                         break;
                     case DELIVERY:
                         amqpResource = (AmqpResource) 
protonEvent.getLink().getContext();
-                        amqpResource.processDeliveryUpdates();
+                        amqpResource.processDeliveryUpdates(this);
                         break;
                     default:
                         break;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5f4acf8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
index df52c7e..245f938 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
@@ -63,7 +63,7 @@ public class AmqpQueueBrowser extends AmqpConsumer {
     }
 
     @Override
-    public void processFlowUpdates() throws IOException {
+    public void processFlowUpdates(AmqpProvider provider) throws IOException {
         if (getEndpoint().getDrain() && getEndpoint().getCredit() == 
getEndpoint().getRemoteCredit()) {
             JmsInboundMessageDispatch browseDone = new 
JmsInboundMessageDispatch(getNextIncomingSequenceNumber());
             browseDone.setConsumerId(getConsumerId());
@@ -76,17 +76,17 @@ public class AmqpQueueBrowser extends AmqpConsumer {
             getEndpoint().setDrain(false);
         }
 
-        super.processFlowUpdates();
+        super.processFlowUpdates(provider);
     }
 
     @Override
-    public void processDeliveryUpdates() throws IOException {
+    public void processDeliveryUpdates(AmqpProvider provider) throws 
IOException {
         if (getEndpoint().getDrain() && getEndpoint().current() != null) {
             LOG.trace("{} incoming delivery, cancel drain.", getConsumerId());
             getEndpoint().setDrain(false);
         }
 
-        super.processDeliveryUpdates();
+        super.processDeliveryUpdates(provider);
 
         if (getEndpoint().getDrain() && getEndpoint().getCredit() == 
getEndpoint().getRemoteCredit()) {
             JmsInboundMessageDispatch browseDone = new 
JmsInboundMessageDispatch(getNextIncomingSequenceNumber());

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5f4acf8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
index c4d33fe..f577b21 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
@@ -104,25 +104,34 @@ public interface AmqpResource {
      * Called when the Proton Engine signals that the state of the given 
resource has
      * changed on the remote side.
      *
+     * @param provider
+     *        the AmqpProvider instance for easier access to fire events.
+     *
      * @throws IOException if an error occurs while processing the update.
      */
-    void processStateChange() throws IOException;
+    void processStateChange(AmqpProvider provider) throws IOException;
 
     /**
      * Called when the Proton Engine signals an Delivery related event has 
been triggered
      * for the given endpoint.
      *
+     * @param provider
+     *        the AmqpProvider instance for easier access to fire events.
+     *
      * @throws IOException if an error occurs while processing the update.
      */
-    void processDeliveryUpdates() throws IOException;
+    void processDeliveryUpdates(AmqpProvider provider) throws IOException;
 
     /**
      * Called when the Proton Engine signals an Flow related event has been 
triggered
      * for the given endpoint.
      *
+     * @param provider
+     *        the AmqpProvider instance for easier access to fire events.
+     *
      * @throws IOException if an error occurs while processing the update.
      */
-    void processFlowUpdates() throws IOException;
+    void processFlowUpdates(AmqpProvider provider) throws IOException;
 
     /**
      * @returns true if the remote end has sent an error

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5f4acf8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
index e41b038..ea105e3 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
@@ -65,7 +65,7 @@ public class AmqpTemporaryDestination extends 
AmqpAbstractResource<JmsTemporaryD
     }
 
     @Override
-    public void processStateChange() {
+    public void processStateChange(AmqpProvider provider) {
         // TODO - We might want to check on our producer to see if it becomes 
closed
         //        which might indicate that the broker purged the temporary 
destination.
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5f4acf8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
index 3326bd4..f145f0a 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
@@ -79,7 +79,7 @@ public class AmqpTransactionContext extends 
AmqpAbstractResource<JmsSessionInfo,
     }
 
     @Override
-    public void processDeliveryUpdates() throws IOException {
+    public void processDeliveryUpdates(AmqpProvider provider) throws 
IOException {
         try {
             if (pendingDelivery != null && pendingDelivery.remotelySettled()) {
                 DeliveryState state = pendingDelivery.getRemoteState();
@@ -121,6 +121,8 @@ public class AmqpTransactionContext extends 
AmqpAbstractResource<JmsSessionInfo,
                     request.onSuccess();
                 }
             }
+
+            super.processDeliveryUpdates(provider);
         } catch (Exception e) {
             throw IOExceptionSupport.create(e);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to