Author: shuston
Date: Thu Sep 24 01:24:28 2009
New Revision: 818338

URL: http://svn.apache.org/viewvc?rev=818338&view=rev
Log:
Apply patches from QPID-2110

Modified:
    qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs
    
qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs
    qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs
    qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs
    qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs
    qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs
    qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
    qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp
    qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h
    qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h
    qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
    qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h
    qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h

Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs?rev=818338&r1=818337&r2=818338&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs Thu Sep 24 
01:24:28 2009
@@ -64,6 +64,12 @@
             set { transport.BrokerPort = value; }
         }
 
+        public int PrefetchLimit
+        {
+            get { return transport.PrefetchLimit; }
+            set { transport.PrefetchLimit = value; }
+        }
+
         public bool Shared
         {
             get { return transport.Shared; }

Modified: 
qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs?rev=818338&r1=818337&r2=818338&view=diff
==============================================================================
--- 
qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs 
(original)
+++ 
qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs 
Thu Sep 24 01:24:28 2009
@@ -63,6 +63,13 @@
             set { brokerPort = value; }
         }
 
+        [ConfigurationProperty(AmqpConfigurationStrings.PrefetchLimit, 
DefaultValue = false)]
+        public int PrefetchLimit
+        {
+            get { return (int)base[AmqpConfigurationStrings.PrefetchLimit]; }
+            set { base[AmqpConfigurationStrings.PrefetchLimit] = value; }
+        }
+
         [ConfigurationProperty(AmqpConfigurationStrings.Shared, DefaultValue = 
false)]
         public bool Shared
         {
@@ -95,6 +102,8 @@
             get
             {
                 ConfigurationPropertyCollection properties = base.Properties;
+                properties.Add(new 
ConfigurationProperty(AmqpConfigurationStrings.PrefetchLimit,
+                    typeof(int), 0, null, null, 
ConfigurationPropertyOptions.None));
                 properties.Add(new 
ConfigurationProperty(AmqpConfigurationStrings.Shared,
                     typeof(bool), false, null, null, 
ConfigurationPropertyOptions.None));
                 properties.Add(new 
ConfigurationProperty(AmqpConfigurationStrings.TransferMode,
@@ -112,6 +121,7 @@
             this.BrokerPort = amqpBinding.BrokerPort;
             this.TransferMode = amqpBinding.TransferMode;
             this.Shared = amqpBinding.Shared;
+            this.PrefetchLimit = amqpBinding.PrefetchLimit;
 
             AmqpProperties props = amqpBinding.DefaultMessageProperties;
         }
@@ -133,6 +143,7 @@
             amqpBinding.BrokerPort = this.BrokerPort;
             amqpBinding.TransferMode = this.TransferMode;
             amqpBinding.Shared = this.Shared;
+            amqpBinding.PrefetchLimit = this.PrefetchLimit;
         }
 
         protected override void PostDeserialize()

Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs?rev=818338&r1=818337&r2=818338&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs Thu Sep 
24 01:24:28 2009
@@ -32,6 +32,7 @@
         AmqpChannelProperties channelProperties;
         long maxBufferPoolSize;
         bool shared;
+       int prefetchLimit;
 
         internal AmqpChannelFactory(AmqpTransportBindingElement 
bindingElement, BindingContext context)
             : base(context.Binding)
@@ -39,6 +40,7 @@
             this.bindingElement = bindingElement;
             this.channelProperties = bindingElement.ChannelProperties.Clone();
             this.shared = bindingElement.Shared;
+            this.prefetchLimit = bindingElement.PrefetchLimit;
             this.maxBufferPoolSize = bindingElement.MaxBufferPoolSize;
             Collection<MessageEncodingBindingElement> 
messageEncoderBindingElements
                 = 
context.BindingParameters.FindAll<MessageEncodingBindingElement>();
@@ -91,7 +93,7 @@
 
         protected override TChannel OnCreateChannel(EndpointAddress 
remoteAddress, Uri via)
         {
-            return (TChannel)(object) new AmqpTransportChannel(this, 
this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder, 
this.maxBufferPoolSize, this.shared);
+            return (TChannel)(object) new AmqpTransportChannel(this, 
this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder, 
this.maxBufferPoolSize, this.shared, this.prefetchLimit);
         }
 
     }

Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs?rev=818338&r1=818337&r2=818338&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs Thu Sep 
24 01:24:28 2009
@@ -46,6 +46,7 @@
         public const string TransferMode = "transferMode";
         public const string Brokers = "brokers";
         public const string Shared = "shared";
+        public const string PrefetchLimit = "prefetchLimit";
         public const string MaxBufferPoolSize = "maxBufferPoolSize";
         public const string MaxReceivedMessageSize = "maxReceivedMessageSize";
     }
@@ -55,7 +56,6 @@
         internal const string BrokerHost = "localhost";
         internal const int BrokerPort = 5672;
         internal const TransferMode TransferMode = 
System.ServiceModel.TransferMode.Buffered;
-        internal const byte Priority = 4;
         internal const long MaxBufferPoolSize = 64 * 1024;
         internal const int MaxReceivedMessageSize = 5 * 1024 * 1024; //64 * 
1024;
     }

Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs?rev=818338&r1=818337&r2=818338&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs 
(original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs Thu Sep 
24 01:24:28 2009
@@ -32,6 +32,7 @@
         AmqpTransportBindingElement bindingElement;
         AmqpChannelProperties channelProperties;
         bool shared;
+        int prefetchLimit;
         long maxBufferPoolSize;
         Uri uri;
         AmqpTransportChannel amqpTransportChannel;
@@ -45,6 +46,7 @@
             this.bindingElement = bindingElement;
             this.channelProperties = bindingElement.ChannelProperties.Clone();
             this.shared = bindingElement.Shared;
+            this.prefetchLimit = bindingElement.PrefetchLimit;
 
             this.maxBufferPoolSize = bindingElement.MaxBufferPoolSize;
 
@@ -132,7 +134,7 @@
             {
                 amqpTransportChannel = new AmqpTransportChannel(this, 
this.channelProperties,
                         new EndpointAddress(uri), 
messageEncoderFactory.Encoder,
-                        maxBufferPoolSize, this.shared);
+                        maxBufferPoolSize, this.shared, this.prefetchLimit);
                 return (IInputChannel)(object) amqpTransportChannel;
             }
 

Modified: 
qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs?rev=818338&r1=818337&r2=818338&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs 
(original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs 
Thu Sep 24 01:24:28 2009
@@ -29,6 +29,7 @@
     {
         AmqpChannelProperties channelProperties;
         bool shared;
+       int prefetchLimit;
 
         public AmqpTransportBindingElement()
         {
@@ -41,6 +42,7 @@
         {
             this.channelProperties = other.channelProperties.Clone();
             this.shared = other.shared;
+            this.prefetchLimit = other.prefetchLimit;
         }
 
         public override IChannelFactory<TChannel> 
BuildChannelFactory<TChannel>(BindingContext context)
@@ -98,6 +100,12 @@
             set { this.channelProperties.BrokerPort = value; }
         }
 
+        public int PrefetchLimit
+        {
+            get { return this.prefetchLimit; }
+            set { this.prefetchLimit = value; }
+        }
+
         public bool Shared
         {
             get { return this.shared; }

Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs?rev=818338&r1=818337&r2=818338&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs 
(original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs Thu Sep 
24 01:24:28 2009
@@ -50,6 +50,7 @@
         private MessageEncoder encoder;
         private AmqpChannelProperties factoryChannelProperties;
         private bool shared;
+        private int prefetchLimit;
         private string encoderContentType;
 
         // input = 0-10 queue, output = 0-10 exchange
@@ -68,7 +69,7 @@
         private AsyncTimeSpanCaller asyncOpenCaller;
         private AsyncTimeSpanCaller asyncCloseCaller;
 
-        internal AmqpTransportChannel(ChannelManagerBase factory, 
AmqpChannelProperties channelProperties, EndpointAddress remoteAddress, 
MessageEncoder msgEncoder, long maxBufferPoolSize, bool sharedConnection)
+        internal AmqpTransportChannel(ChannelManagerBase factory, 
AmqpChannelProperties channelProperties, EndpointAddress remoteAddress, 
MessageEncoder msgEncoder, long maxBufferPoolSize, bool sharedConnection, int 
prefetchLimit)
             : base(factory)
         {
             this.isInputChannel = (factory is ChannelListenerBase) || (factory 
is AmqpChannelFactory<IInputChannel>);
@@ -80,6 +81,7 @@
 
             this.factoryChannelProperties = channelProperties;
             this.shared = sharedConnection;
+            this.prefetchLimit = prefetchLimit;
             this.remoteAddress = remoteAddress;
 
             // pull out host, port, queue, and connection arguments
@@ -128,6 +130,7 @@
             if (this.isInputChannel)
             {
                 this.inputLink = 
ConnectionManager.GetInputLink(this.factoryChannelProperties, shared, false, 
this.queueName);
+                this.inputLink.PrefetchLimit = this.prefetchLimit;
             }
             else
             {
@@ -287,7 +290,7 @@
 
             return false;
         }
- 
+
         public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback 
callback, object state)
         {
             return this.inputLink.BeginTryReceive(timeout, callback, state);
@@ -464,7 +467,7 @@
             }
             return amqpMessage;
         }
-     
+
 
         private Message QpidToWcf(AmqpMessage amqpMessage)
         {
@@ -531,7 +534,7 @@
                 {
                     this.bufferManager.ReturnBuffer(managedBuffer);
                 }
-            }  
+            }
 
             return wcfMessage;
         }

Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp?rev=818338&r1=818337&r2=818338&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp Thu Sep 24 
01:24:28 2009
@@ -63,11 +63,12 @@
        sessionp = new qpid::client::AsyncSession;
        *sessionp = qpidConnectionp->newSession();
        subs_mgrp = new SubscriptionManager (*sessionp);
-       success = true;
        waiters = gcnew Collections::Generic::List<CompletionWaiter^>();
+       success = true;
     } finally {
         if (!success) {
            Cleanup();
+           // TODO: include inner exception information
            throw gcnew QpidException ("session creation failure");
        }
     }
@@ -76,12 +77,6 @@
 
 void AmqpSession::Cleanup()
 {
-    if (subscriptionp != NULL) {
-        subscriptionp->cancel();
-       delete subscriptionp;
-       subscriptionp=NULL;
-    }
-
     if (subs_mgrp != NULL) {
        subs_mgrp->stop();
        delete subs_mgrp;
@@ -112,6 +107,7 @@
 
 void AmqpSession::ConnectionClosed()
 {
+    lock l(waiters);
     Cleanup();
 }
 
@@ -283,5 +279,27 @@
     }
 }
 
+bool AmqpSession::MessageStop(Completion &comp, std::string &name)
+{
+    lock l(waiters);
+
+    if (sessionp == NULL)
+       return false;
+
+    comp = sessionp->messageStop(name, true);
+    return true;
+}
+
+void AmqpSession::AcceptAndComplete(SequenceSet& transfers)
+{
+    lock l(waiters);
+
+    if (sessionp == NULL)
+       throw gcnew ObjectDisposedException("Accept");
+
+    sessionp->markCompleted(transfers, false);
+    sessionp->messageAccept(transfers, false);
+}
+
 
 }}} // namespace Apache::Qpid::Cli

Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h?rev=818338&r1=818337&r2=818338&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h Thu Sep 24 
01:24:28 2009
@@ -44,7 +44,6 @@
     AsyncSession* sessionp;
     SessionImpl* sessionImplp;
     SubscriptionManager* subs_mgrp;
-    Subscription* subscriptionp;
     LocalQueue* localQueuep;
     Collections::Generic::List<CompletionWaiter^>^ waiters;
     bool helperRunning;
@@ -69,6 +68,8 @@
     void ConnectionClosed();
     void internalWaitForCompletion(IntPtr Future);
     void removeWaiter(CompletionWaiter^ waiter);
+    bool MessageStop(Completion &comp, std::string &name);
+    void AcceptAndComplete(SequenceSet& transfers);
 
     property AmqpConnection^ Connection {
        AmqpConnection^ get () { return connection; }

Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h?rev=818338&r1=818337&r2=818338&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h Thu Sep 24 
01:24:28 2009
@@ -32,7 +32,6 @@
     bool timedOut;
     // has an owner thread
     bool assigned;
-    // can Run (i.e. earlier CompletionWaiters in the queue have completed)
     System::Exception^ runException;
     AsyncCallback^ asyncCallback;
     Threading::Timer ^timer;

Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp?rev=818338&r1=818337&r2=818338&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp Thu Sep 24 
01:24:28 2009
@@ -59,6 +59,12 @@
 // with proposed changes to the native library to reduce the number of 
servicing
 // threads for large numbers of subscriptions.
 
+// synchronization is accomplished with locks, but also by ensuring that only 
one
+// MessageWaiter (the one at the front of the line) is ever active.  
+// async threads to watch for: Close/finalizer, Timers, SyncCredit and the 
native Dispatch
+// thread (who deposits FrameSets into the local queue and is oblivious to the
+// managed space locks).
+
 
 // The folowing def must match the "Frames" private typedef.
 // TODO, make Qpid-cpp "Frames" definition visible.
@@ -94,6 +100,8 @@
        localQueuep = new LocalQueue;
        SubscriptionSettings settings;
        settings.flowControl = FlowControl::messageCredit(0);
+       settings.completionMode = CompletionMode::MANUAL_COMPLETION;
+
        Subscription sub = qpidSubsMgrp->subscribe(*localQueuep, qname, 
settings);
        subscriptionp = new Subscription (sub); // copy smart pointer for later 
IDisposable cleanup
 
@@ -197,6 +205,7 @@
 IntPtr InputLink::nextLocalMessage()
 {
     lock l(waiters);
+
     if (disposed)
        return (IntPtr) NULL;
 
@@ -279,8 +288,7 @@
        if (haveMessage())
            return true;
 
-       // TODO: prefetch window of messages, compatible with both 0-10 and 1.0.
-       subscriptionp->grantMessageCredit(1);
+       AdjustCredit();
 
        // get a scoped smart ptr ref to guard against async close or hangup
        demuxQueuePtr = *queuePtrp;
@@ -350,7 +358,12 @@
        }
        return;
     }
+
     waiters->RemoveAt(idx);
+    if (waiter->TimedOut) {
+       // may have to give back message if it arrives momentarily
+       AdjustCredit();
+    }
 
     // let the next waiter know it's his turn. 
     if (waiters->Count > 0) {
@@ -411,6 +424,129 @@
 }
 
 
+void InputLink::PrefetchLimit::set(int value)
+{
+    lock l(waiters);
+    prefetchLimit = value;
+
+    int delta = 0;
+
+    // rough rule of thumb to keep the flow, but reduce chatter.
+    // for small messages, the credit request is almost as expensive as the 
transfer itself.
+    // experience may suggest a better heuristic or require a property for the 
low water mark
+    if (prefetchLimit >= 3) {
+       delta = prefetchLimit / 3;
+    }
+    minWorkingCredit = prefetchLimit - delta;
+    AdjustCredit();
+}
+
+
+// call with lock held
+void InputLink::AdjustCredit()
+{
+    if (creditSyncPending || disposed)
+       return;
+
+    // low watermark check
+    if ((prefetchLimit != 0) &&
+       (workingCredit >= minWorkingCredit) &&
+       (workingCredit >= waiters->Count))
+       return;
+
+    // should have enough for all waiters or to satisfy the prefetch window
+    int targetCredit = waiters->Count;
+    if (targetCredit < prefetchLimit)
+       targetCredit = prefetchLimit;
+
+    if (targetCredit > workingCredit) {
+       subscriptionp->grantMessageCredit(targetCredit - workingCredit);
+       workingCredit = targetCredit;
+       return;
+    }
+    if (targetCredit < workingCredit) {
+       if ((targetCredit == 0) && (prefetchLimit == 0)) {
+           creditSyncPending = true;
+           ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, 
&InputLink::SyncCredit));
+       }
+       // TODO: also shrink credit when prefetchLimit != 0
+    }
+}
+
+void InputLink::SyncCredit(Object ^unused)
+{
+    lock l(waiters);
+
+    try {
+       if (disposed)
+           return;
+
+       Completion comp;
+       if (!amqpSession->MessageStop(comp, subscriptionp->getName())) {
+           // connection closed
+           return;
+       }
+
+       // get a private scoped copy to use outside the lock
+       Subscription s(*subscriptionp);
+
+       l.release();
+       // use setFlowControl to re-enable credit flow on the broker.
+       // previously used comp.wait() here, but setFlowControl is a sync 
operation
+       s.setFlowControl(s.getSettings().flowControl);
+       l.acquire();
+
+       if (disposed)
+           return;
+
+       // let existing waiters use up any
+       // local queue size can only decrease until more credit is issued
+       while (true) {
+           if ((waiters->Count > 0) && ((*queuePtrp)->size() > 0)) {
+               l.release();
+               // a rare use case and not used in performance oriented code.
+               // optimization can wait until the qpid/messaging api is used
+               Thread::Sleep(10);
+               l.acquire();
+               if (disposed)
+                   return;
+           }
+           else {
+               break;
+           }
+       }
+
+       // At this point, the lock is held and we are fully synced with the 
broker
+       // so we have a valid snapshot 
+
+       if ((prefetchLimit == 0) && ((*queuePtrp)->size() > 0)) {
+           // can't be sure application will request a message again any time 
soon
+           QpidFrameSetPtr frameSetp;
+           while (!(*queuePtrp)->empty()) {
+               (*queuePtrp)->pop(frameSetp);
+               SequenceSet frameSetID(frameSetp->getId());
+               subscriptionp->release(frameSetID);
+           }
+
+           // don't touch dequeuedFrameSetpp.  It is spoken for: explicitely 
from a
+           // MessageWaiter about to to get the nextLocalMessage(), or 
implicitely
+           // from a WaitForMessage().
+       }
+       // TODO: if prefetchLimit != 0, release messages from back of the queue 
that exceed targetCredit
+
+       workingCredit = (*queuePtrp)->size();
+       if (dequeuedFrameSetpp != NULL) {
+           workingCredit++;
+       }
+    }
+    finally {
+       creditSyncPending = false;
+    }
+
+    AdjustCredit();
+}
+
+
 AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp)
 {
     QpidFrameSetPtr* fspp = (QpidFrameSetPtr*) msgp.ToPointer();
@@ -539,7 +675,15 @@
 
        // We have a message we can return to the caller.
        // Tell the broker we got it.
-       subscriptionp->accept(frameSetID);
+       
+       // subscriptionp->accept(frameSetID) is a slow sync operation in the 
native API
+       // so do it within the AsyncSession directly
+       amqpSession->AcceptAndComplete(frameSetID);
+
+       workingCredit--;
+       // check if more messages need to be requested from broker
+       AdjustCredit();
+
        return amqpMessage;
     }
     finally {

Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h?rev=818338&r1=818337&r2=818338&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h Thu Sep 24 01:24:28 
2009
@@ -47,6 +47,14 @@
     bool finalizing;
     QpidFrameSetPtr* dequeuedFrameSetpp;
     ManualResetEvent^ asyncHelperWaitHandle;
+    // number of messages to buffer locally for future consumption
+    int prefetchLimit;
+    // the number of messages requested and not yet processed
+    int workingCredit;
+    // stopping and restarting the message flow
+    bool creditSyncPending;
+    // working credit low water mark
+    int minWorkingCredit;
 
     void Cleanup();
     void ReleaseNative();
@@ -54,6 +62,8 @@
     void addWaiter(MessageWaiter^ waiter);
     void asyncHelper();
     AmqpMessage^ createAmqpMessage(IntPtr msgp);
+    void AdjustCredit();
+    void SyncCredit(Object ^);
 
 internal:
     InputLink(AmqpSession^ session, System::String^ sourceQueue, 
qpid::client::AsyncSession *qpidSessionp,
@@ -80,6 +90,11 @@
     IAsyncResult^ BeginWaitForMessage(TimeSpan timeout, AsyncCallback^ 
callback, Object^ state);
     bool EndWaitForMessage(IAsyncResult^ result);
 
+    property int PrefetchLimit {
+       int get () { return prefetchLimit; }
+       void set (int value);
+    }
+
 };
 
 }}} // namespace Apache::Qpid::Interop

Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h?rev=818338&r1=818337&r2=818338&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h Thu Sep 24 
01:24:28 2009
@@ -66,7 +66,6 @@
     void Activate();
     void WaitForCompletion();
 
-//    inline void SetCompletedSynchronously (bool v) { completedSynchronously 
= v; }
 
     property IntPtr Message {
        IntPtr get () { 
@@ -78,7 +77,6 @@
            GC::SuppressFinalize(this);
            return v;
        }
-       // void set (IntPtr v) { message = v; }
     }
 
     property bool Assigned {
@@ -89,11 +87,11 @@
        bool get () { return timedOut; }
     }
 
-
     property System::Exception^ RunException {
        System::Exception^ get() { return runException; }
     }
 
+
  public:
 
     virtual property bool IsCompleted {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to