Author: rhs
Date: Thu Feb  5 06:41:54 2009
New Revision: 741024

URL: http://svn.apache.org/viewvc?rev=741024&view=rev
Log:
QPID-1646: implemented handlers for producer flow control signals

Modified:
    
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
    
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
    
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java

Modified: 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java?rev=741024&r1=741023&r2=741024&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
 (original)
+++ 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
 Thu Feb  5 06:41:54 2009
@@ -22,8 +22,9 @@
 
 import static org.apache.qpid.transport.Connection.State.OPEN;
 
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslClient;
@@ -63,11 +64,14 @@
 
     @Override public void connectionStart(Connection conn, ConnectionStart 
start)
     {
+        Map clientProperties = new HashMap();
+        clientProperties.put("qpid.session_flow", 1);
+
         List<Object> mechanisms = start.getMechanisms();
         if (mechanisms == null || mechanisms.isEmpty())
         {
             conn.connectionStartOk
-                (Collections.EMPTY_MAP, null, null, conn.getLocale());
+                (clientProperties, null, null, conn.getLocale());
             return;
         }
 
@@ -86,7 +90,7 @@
             byte[] response = sc.hasInitialResponse() ?
                 sc.evaluateChallenge(new byte[0]) : null;
             conn.connectionStartOk
-                (Collections.EMPTY_MAP, sc.getMechanismName(), response,
+                (clientProperties, sc.getMechanismName(), response,
                  conn.getLocale());
         }
         catch (SaslException e)
@@ -132,7 +136,7 @@
     {
         throw new UnsupportedOperationException();
     }
-    
+
     /**
      * Currently the spec specified the min and max for heartbeat using secs
      */

Modified: 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=741024&r1=741023&r2=741024&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
 (original)
+++ 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
 Thu Feb  5 06:41:54 2009
@@ -33,6 +33,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.qpid.transport.Option.*;
 import static org.apache.qpid.transport.Session.State.*;
@@ -99,6 +101,10 @@
 
     private State state = NEW;
 
+    // transfer flow control
+    private volatile boolean flowControl = false;
+    private Semaphore credit = new Semaphore(0);
+
     Session(Connection connection, Binary name, long expiry)
     {
         this.connection = connection;
@@ -166,6 +172,41 @@
         }
     }
 
+    void setFlowControl(boolean value)
+    {
+        flowControl = value;
+    }
+
+    void addCredit(int value)
+    {
+        credit.release(value);
+    }
+
+    void drainCredit()
+    {
+        credit.drainPermits();
+    }
+
+    void acquireCredit()
+    {
+        if (flowControl)
+        {
+            try
+            {
+                if (!credit.tryAcquire(timeout, TimeUnit.MILLISECONDS))
+                {
+                    throw new SessionException
+                        ("timed out waiting for message credit");
+                }
+            }
+            catch (InterruptedException e)
+            {
+                throw new SessionException
+                    ("interrupted while waiting for credit", null, e);
+            }
+        }
+    }
+
     private void initReceiver()
     {
         synchronized (processedLock)
@@ -428,6 +469,11 @@
     {
         if (m.getEncodedTrack() == Frame.L4)
         {
+            if (m.hasPayload())
+            {
+                acquireCredit();
+            }
+
             synchronized (commands)
             {
                 if (state != OPEN && state != CLOSED)

Modified: 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java?rev=741024&r1=741023&r2=741024&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
 (original)
+++ 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
 Thu Feb  5 06:41:54 2009
@@ -146,4 +146,42 @@
         ssn.getSessionListener().message(ssn, xfr);
     }
 
+    @Override public void messageSetFlowMode(Session ssn, MessageSetFlowMode 
sfm)
+    {
+        if ("".equals(sfm.getDestination()) &&
+            MessageFlowMode.CREDIT.equals(sfm.getFlowMode()))
+        {
+            ssn.setFlowControl(true);
+        }
+        else
+        {
+            super.messageSetFlowMode(ssn, sfm);
+        }
+    }
+
+    @Override public void messageFlow(Session ssn, MessageFlow flow)
+    {
+        if ("".equals(flow.getDestination()) &&
+            MessageCreditUnit.MESSAGE.equals(flow.getUnit()))
+        {
+            ssn.addCredit((int) flow.getValue());
+        }
+        else
+        {
+            super.messageFlow(ssn, flow);
+        }
+    }
+
+    @Override public void messageStop(Session ssn, MessageStop stop)
+    {
+        if ("".equals(stop.getDestination()))
+        {
+            ssn.drainCredit();
+        }
+        else
+        {
+            super.messageStop(ssn, stop);
+        }
+    }
+
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to