Author: robbie
Date: Thu Aug 28 16:34:09 2014
New Revision: 1621161
URL: http://svn.apache.org/r1621161
Log:
QPIDJMS-26: update message receiver to use delivery events to provoke initial
processing
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java?rev=1621161&r1=1621160&r2=1621161&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java
(original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java
Thu Aug 28 16:34:09 2014
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.jms.engine;
+import java.util.Deque;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
@@ -29,6 +32,9 @@ public class AmqpReceiver extends AmqpLi
private Receiver _protonReceiver;
private byte[] _buffer = new byte[1024];
+ //TODO: custom queue with timeout based retrieval
+ Deque<AmqpMessage> _messages = new ConcurrentLinkedDeque<AmqpMessage>();
+
public AmqpReceiver(AmqpSession amqpSession, Receiver protonReceiver,
AmqpConnection amqpConnection)
{
super(amqpSession, protonReceiver, amqpConnection);
@@ -45,55 +51,58 @@ public class AmqpReceiver extends AmqpLi
public AmqpMessage receiveNoWait()
{
- synchronized (getAmqpConnection())
+ return _messages.pollFirst();
+ }
+
+ @Override
+ void processDeliveryUpdate(Delivery delivery)
+ {
+ //TODO: this is currently processing all messages for the link, should
really just do the one given.
+ // We can't call recv if the passed delivery is not the 'current', but
cant throw the event away either (could be a before-complete disposition
change?)
+ // Doesnt handle settlement yet.
+
+ Delivery currentDelivery = _protonReceiver.current();
+ if(currentDelivery != null)
{
- Delivery currentDelivery = _protonReceiver.current();
- if(currentDelivery != null)
+ if(currentDelivery.getContext() == null)
{
- if(currentDelivery.getContext() == null)
+ if (currentDelivery.isReadable() &&
!currentDelivery.isPartial())
{
- if (currentDelivery.isReadable() &&
!currentDelivery.isPartial())
+ int total = 0;
+ int start = 0;
+ while (true)
{
-
- int total = 0;
- int start = 0;
- while (true)
+ int read = _protonReceiver.recv(_buffer, start,
_buffer.length - start);
+ total += read;
+ if (read == (_buffer.length - start))
{
- int read = _protonReceiver.recv(_buffer, start,
_buffer.length - start);
- total += read;
- if (read == (_buffer.length - start))
- {
- //may need to expand the buffer (is there a
better test?)
- byte[] old = _buffer;
- _buffer = new byte[_buffer.length*2];
- System.arraycopy(old, 0, _buffer, 0,
old.length);
- start += read;
- }
- else
- {
- break;
- }
+ //may need to expand the buffer (is there a better
test?)
+ byte[] old = _buffer;
+ _buffer = new byte[_buffer.length*2];
+ System.arraycopy(old, 0, _buffer, 0, old.length);
+ start += read;
}
+ else
+ {
+ break;
+ }
+ }
- Message message = Message.Factory.create();
- message.decode(_buffer, 0, total);
+ Message message = Message.Factory.create();
+ message.decode(_buffer, 0, total);
- //TODO: dont create a new factory for every message
- AmqpMessage amqpMessage = new
AmqpMessageFactory().createAmqpMessage(currentDelivery, message,
getAmqpConnection());
- currentDelivery.setContext(amqpMessage);
- _protonReceiver.advance();
- return amqpMessage;
- }
+ //TODO: dont create a new factory for every message
+ AmqpMessage amqpMessage = new
AmqpMessageFactory().createAmqpMessage(currentDelivery, message,
getAmqpConnection());
+ currentDelivery.setContext(amqpMessage);
+ _protonReceiver.advance();
+ _messages.add(amqpMessage);
}
}
+ else
+ {
+ //TODO: previously processed this message. Updated disposition
info?
+ }
}
- return null;
- }
-
- @Override
- void processDeliveryUpdate(Delivery delivery)
- {
- //TODO: implement receiver delivery update event processing
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]