Author: rgodfrey
Date: Wed May 30 11:30:59 2012
New Revision: 1344202

URL: http://svn.apache.org/viewvc?rev=1344202&view=rev
Log:
Proton-j : allow for multiple transfer frames

Modified:
    
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
    
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java
    
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSession.java

Modified: 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryImpl.java?rev=1344202&r1=1344201&r2=1344202&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
 (original)
+++ 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
 Wed May 30 11:30:59 2012
@@ -52,6 +52,7 @@ public class DeliveryImpl implements Del
     private TransportDelivery _transportDelivery;
     private byte[] _data;
     private int _dataSize;
+    private boolean _complete;
 
     public DeliveryImpl(final byte[] tag, final LinkImpl link, DeliveryImpl 
previous)
     {
@@ -169,7 +170,7 @@ public class DeliveryImpl implements Del
         {
             clearFlag(IO_WORK);
         }
-        return consumed;  //TODO - Implement
+        return (_complete && consumed == 0) ? TransportImpl.END_OF_STREAM : 
consumed;  //TODO - Implement
     }
 
     private void clearFlag(int ioWork)
@@ -236,8 +237,9 @@ public class DeliveryImpl implements Del
         }
     }
 
-    void clearTransportWork()
+    DeliveryImpl clearTransportWork()
     {
+        DeliveryImpl next = _transportWorkNext;
         getLink().getConnectionImpl().removeTransportWork(this);
         if(_transportWorkPrev != null)
         {
@@ -250,11 +252,17 @@ public class DeliveryImpl implements Del
         }
         _transportWorkNext = null;
         _transportWorkPrev = null;
+        return next;
     }
 
     void addToTransportWorkList()
     {
-        getLink().getConnectionImpl().addTransportWork(this);
+        if(_transportWorkNext == null
+           && _transportWorkPrev == null
+           && getLink().getConnectionImpl().getTransportWorkHead() != this)
+        {
+            getLink().getConnectionImpl().addTransportWork(this);
+        }
     }
 
 
@@ -281,6 +289,10 @@ public class DeliveryImpl implements Del
 
     void setTransportWorkNext(DeliveryImpl transportWorkNext)
     {
+        if(transportWorkNext == this)
+        {
+            (new Exception("Aaaargh")).printStackTrace();
+        }
         _transportWorkNext = transportWorkNext;
     }
 
@@ -323,7 +335,8 @@ public class DeliveryImpl implements Del
         }
         System.arraycopy(bytes,offset,_data,_dataSize,length);
         _dataSize+=length;
-        addToWorkList();
+//        addToWorkList();
+        addToTransportWorkList();
         return length;  //TODO - Implement.
     }
 
@@ -342,7 +355,7 @@ public class DeliveryImpl implements Del
         return _dataSize;  //TODO - Implement.
     }
 
-    public void setData(byte[] data)
+    void setData(byte[] data)
     {
         _data = data;
     }
@@ -370,4 +383,9 @@ public class DeliveryImpl implements Del
                 && getLink().current() == this
                 && _dataSize > 0;
     }
+
+    void setComplete()
+    {
+        _complete = true;
+    }
 }

Modified: 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1344202&r1=1344201&r2=1344202&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java
 (original)
+++ 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java
 Wed May 30 11:30:59 2012
@@ -205,8 +205,8 @@ public class TransportImpl extends Endpo
 
 
                 TransportLink transportLink = sender.getTransportLink();
+
                 UnsignedInteger deliveryId = transportLink.getDeliveryCount();
-                
transportLink.setDeliveryCount(deliveryId.add(UnsignedInteger.ONE));
                 TransportDelivery transportDelivery = new 
TransportDelivery(deliveryId, delivery, transportLink);
 
 
@@ -214,6 +214,10 @@ public class TransportImpl extends Endpo
                 transfer.setDeliveryId(deliveryId);
                 transfer.setDeliveryTag(new Binary(delivery.getTag()));
                 transfer.setHandle(transportLink.getLocalHandle());
+                if(delivery.getLink().current() == delivery)
+                {
+                    transfer.setMore(true);
+                }
                 transfer.setMessageFormat(UnsignedInteger.ZERO);
 
                 // TODO - large frames
@@ -228,8 +232,24 @@ public class TransportImpl extends Endpo
                 offset += frameBytes;
                 length -= frameBytes;
 
+                // TODO partial consumption
+                delivery.setData(null);
+                delivery.setDataLength(0);
+
+                if(delivery.getLink().current() != delivery)
+                {
+                    
transportLink.setDeliveryCount(transportLink.getDeliveryCount().add(UnsignedInteger.ONE));
+                }
+
+                delivery = delivery.clearTransportWork();
+
+
+
+            }
+            else
+            {
+                delivery = delivery.getTransportWorkNext();
             }
-            delivery = delivery.getTransportWorkNext();
         }
         return written;
     }
@@ -685,7 +705,7 @@ public class TransportImpl extends Endpo
 
     public void handleTransfer(Transfer transfer, Binary payload, Integer 
channel)
     {
-        System.out.println("CH["+channel+"] : " + transfer);
+        System.out.println("CH["+channel+"] : " + transfer + " ["+payload+"]");
         // TODO - check channel < max_channel
         TransportSession transportSession = _remoteSessions[channel];
         if(transportSession != null)

Modified: 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSession.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSession.java?rev=1344202&r1=1344201&r2=1344202&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSession.java
 (original)
+++ 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSession.java
 Wed May 30 11:30:59 2012
@@ -49,6 +49,7 @@ class TransportSession
     private UnsignedInteger _remoteOutgoingWindow;
     private UnsignedInteger _remoteNextIncomingId;
     private UnsignedInteger _remoteNextOutgoingId;
+    private Map<UnsignedInteger, DeliveryImpl> _unsettledDeliveriesById = new 
HashMap<UnsignedInteger, DeliveryImpl>();
 
     public TransportSession(SessionImpl session)
     {
@@ -181,11 +182,15 @@ class TransportSession
 
     public void handleTransfer(Transfer transfer, Binary payload)
     {
-
+        DeliveryImpl delivery;
         if(transfer.getDeliveryId() == null || 
transfer.getDeliveryId().equals(_currentDeliveryId))
         {
+            TransportReceiver transportReceiver = (TransportReceiver) 
getLinkFromRemoteHandle(transfer.getHandle());
+            ReceiverImpl receiver = transportReceiver.getReceiver();
+            Binary deliveryTag = transfer.getDeliveryTag();
+            delivery = _unsettledDeliveriesById.get(_currentDeliveryId);
+
 
-            // TODO - handle large messages
         }
         else
         {
@@ -195,24 +200,27 @@ class TransportSession
             TransportReceiver transportReceiver = (TransportReceiver) 
getLinkFromRemoteHandle(transfer.getHandle());
             ReceiverImpl receiver = transportReceiver.getReceiver();
             Binary deliveryTag = transfer.getDeliveryTag();
-            DeliveryImpl delivery = receiver.delivery(deliveryTag.getArray(), 
deliveryTag.getArrayOffset(),
+            delivery = receiver.delivery(deliveryTag.getArray(), 
deliveryTag.getArrayOffset(),
                                                       deliveryTag.getLength());
             TransportDelivery transportDelivery = new 
TransportDelivery(_currentDeliveryId, delivery, transportReceiver);
             delivery.setTransportDelivery(transportDelivery);
-            // TODO - should this be a copy?
-            if(payload != null)
-            {
-                delivery.setData(payload.getArray());
-                delivery.setDataLength(payload.getLength());
-                delivery.setDataOffset(payload.getArrayOffset());
-            }
-            delivery.addIOWork();
+            _unsettledDeliveriesById.put(_currentDeliveryId, delivery);
 
 
         }
+        // TODO - should this be a copy?
+        if(payload != null)
+        {
+            delivery.setData(payload.getArray());
+            delivery.setDataLength(payload.getLength());
+            delivery.setDataOffset(payload.getArrayOffset());
+        }
+        delivery.addIOWork();
+
 
         if(!(transfer.getMore() || transfer.getAborted()))
         {
+            delivery.setComplete();
             _incomingWindowSize = 
_incomingWindowSize.subtract(UnsignedInteger.ONE);
         }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to