Author: rgodfrey Date: Wed May 30 11:30:59 2012 New Revision: 1344202 URL: http://svn.apache.org/viewvc?rev=1344202&view=rev Log: Proton-j : allow for multiple transfer frames
Modified: qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryImpl.java qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSession.java Modified: qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryImpl.java URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryImpl.java?rev=1344202&r1=1344201&r2=1344202&view=diff ============================================================================== --- qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryImpl.java (original) +++ qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/DeliveryImpl.java Wed May 30 11:30:59 2012 @@ -52,6 +52,7 @@ public class DeliveryImpl implements Del private TransportDelivery _transportDelivery; private byte[] _data; private int _dataSize; + private boolean _complete; public DeliveryImpl(final byte[] tag, final LinkImpl link, DeliveryImpl previous) { @@ -169,7 +170,7 @@ public class DeliveryImpl implements Del { clearFlag(IO_WORK); } - return consumed; //TODO - Implement + return (_complete && consumed == 0) ? TransportImpl.END_OF_STREAM : consumed; //TODO - Implement } private void clearFlag(int ioWork) @@ -236,8 +237,9 @@ public class DeliveryImpl implements Del } } - void clearTransportWork() + DeliveryImpl clearTransportWork() { + DeliveryImpl next = _transportWorkNext; getLink().getConnectionImpl().removeTransportWork(this); if(_transportWorkPrev != null) { @@ -250,11 +252,17 @@ public class DeliveryImpl implements Del } _transportWorkNext = null; _transportWorkPrev = null; + return next; } void addToTransportWorkList() { - getLink().getConnectionImpl().addTransportWork(this); + if(_transportWorkNext == null + && _transportWorkPrev == null + && getLink().getConnectionImpl().getTransportWorkHead() != this) + { + getLink().getConnectionImpl().addTransportWork(this); + } } @@ -281,6 +289,10 @@ public class DeliveryImpl implements Del void setTransportWorkNext(DeliveryImpl transportWorkNext) { + if(transportWorkNext == this) + { + (new Exception("Aaaargh")).printStackTrace(); + } _transportWorkNext = transportWorkNext; } @@ -323,7 +335,8 @@ public class DeliveryImpl implements Del } System.arraycopy(bytes,offset,_data,_dataSize,length); _dataSize+=length; - addToWorkList(); +// addToWorkList(); + addToTransportWorkList(); return length; //TODO - Implement. } @@ -342,7 +355,7 @@ public class DeliveryImpl implements Del return _dataSize; //TODO - Implement. } - public void setData(byte[] data) + void setData(byte[] data) { _data = data; } @@ -370,4 +383,9 @@ public class DeliveryImpl implements Del && getLink().current() == this && _dataSize > 0; } + + void setComplete() + { + _complete = true; + } } Modified: qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1344202&r1=1344201&r2=1344202&view=diff ============================================================================== --- qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java (original) +++ qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java Wed May 30 11:30:59 2012 @@ -205,8 +205,8 @@ public class TransportImpl extends Endpo TransportLink transportLink = sender.getTransportLink(); + UnsignedInteger deliveryId = transportLink.getDeliveryCount(); - transportLink.setDeliveryCount(deliveryId.add(UnsignedInteger.ONE)); TransportDelivery transportDelivery = new TransportDelivery(deliveryId, delivery, transportLink); @@ -214,6 +214,10 @@ public class TransportImpl extends Endpo transfer.setDeliveryId(deliveryId); transfer.setDeliveryTag(new Binary(delivery.getTag())); transfer.setHandle(transportLink.getLocalHandle()); + if(delivery.getLink().current() == delivery) + { + transfer.setMore(true); + } transfer.setMessageFormat(UnsignedInteger.ZERO); // TODO - large frames @@ -228,8 +232,24 @@ public class TransportImpl extends Endpo offset += frameBytes; length -= frameBytes; + // TODO partial consumption + delivery.setData(null); + delivery.setDataLength(0); + + if(delivery.getLink().current() != delivery) + { + transportLink.setDeliveryCount(transportLink.getDeliveryCount().add(UnsignedInteger.ONE)); + } + + delivery = delivery.clearTransportWork(); + + + + } + else + { + delivery = delivery.getTransportWorkNext(); } - delivery = delivery.getTransportWorkNext(); } return written; } @@ -685,7 +705,7 @@ public class TransportImpl extends Endpo public void handleTransfer(Transfer transfer, Binary payload, Integer channel) { - System.out.println("CH["+channel+"] : " + transfer); + System.out.println("CH["+channel+"] : " + transfer + " ["+payload+"]"); // TODO - check channel < max_channel TransportSession transportSession = _remoteSessions[channel]; if(transportSession != null) Modified: qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSession.java URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSession.java?rev=1344202&r1=1344201&r2=1344202&view=diff ============================================================================== --- qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSession.java (original) +++ qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSession.java Wed May 30 11:30:59 2012 @@ -49,6 +49,7 @@ class TransportSession private UnsignedInteger _remoteOutgoingWindow; private UnsignedInteger _remoteNextIncomingId; private UnsignedInteger _remoteNextOutgoingId; + private Map<UnsignedInteger, DeliveryImpl> _unsettledDeliveriesById = new HashMap<UnsignedInteger, DeliveryImpl>(); public TransportSession(SessionImpl session) { @@ -181,11 +182,15 @@ class TransportSession public void handleTransfer(Transfer transfer, Binary payload) { - + DeliveryImpl delivery; if(transfer.getDeliveryId() == null || transfer.getDeliveryId().equals(_currentDeliveryId)) { + TransportReceiver transportReceiver = (TransportReceiver) getLinkFromRemoteHandle(transfer.getHandle()); + ReceiverImpl receiver = transportReceiver.getReceiver(); + Binary deliveryTag = transfer.getDeliveryTag(); + delivery = _unsettledDeliveriesById.get(_currentDeliveryId); + - // TODO - handle large messages } else { @@ -195,24 +200,27 @@ class TransportSession TransportReceiver transportReceiver = (TransportReceiver) getLinkFromRemoteHandle(transfer.getHandle()); ReceiverImpl receiver = transportReceiver.getReceiver(); Binary deliveryTag = transfer.getDeliveryTag(); - DeliveryImpl delivery = receiver.delivery(deliveryTag.getArray(), deliveryTag.getArrayOffset(), + delivery = receiver.delivery(deliveryTag.getArray(), deliveryTag.getArrayOffset(), deliveryTag.getLength()); TransportDelivery transportDelivery = new TransportDelivery(_currentDeliveryId, delivery, transportReceiver); delivery.setTransportDelivery(transportDelivery); - // TODO - should this be a copy? - if(payload != null) - { - delivery.setData(payload.getArray()); - delivery.setDataLength(payload.getLength()); - delivery.setDataOffset(payload.getArrayOffset()); - } - delivery.addIOWork(); + _unsettledDeliveriesById.put(_currentDeliveryId, delivery); } + // TODO - should this be a copy? + if(payload != null) + { + delivery.setData(payload.getArray()); + delivery.setDataLength(payload.getLength()); + delivery.setDataOffset(payload.getArrayOffset()); + } + delivery.addIOWork(); + if(!(transfer.getMore() || transfer.getAborted())) { + delivery.setComplete(); _incomingWindowSize = _incomingWindowSize.subtract(UnsignedInteger.ONE); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org