Author: shuston Date: Thu Sep 24 01:24:28 2009 New Revision: 818338 URL: http://svn.apache.org/viewvc?rev=818338&view=rev Log: Apply patches from QPID-2110
Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs?rev=818338&r1=818337&r2=818338&view=diff ============================================================================== --- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs (original) +++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs Thu Sep 24 01:24:28 2009 @@ -64,6 +64,12 @@ set { transport.BrokerPort = value; } } + public int PrefetchLimit + { + get { return transport.PrefetchLimit; } + set { transport.PrefetchLimit = value; } + } + public bool Shared { get { return transport.Shared; } Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs?rev=818338&r1=818337&r2=818338&view=diff ============================================================================== --- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs (original) +++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs Thu Sep 24 01:24:28 2009 @@ -63,6 +63,13 @@ set { brokerPort = value; } } + [ConfigurationProperty(AmqpConfigurationStrings.PrefetchLimit, DefaultValue = false)] + public int PrefetchLimit + { + get { return (int)base[AmqpConfigurationStrings.PrefetchLimit]; } + set { base[AmqpConfigurationStrings.PrefetchLimit] = value; } + } + [ConfigurationProperty(AmqpConfigurationStrings.Shared, DefaultValue = false)] public bool Shared { @@ -95,6 +102,8 @@ get { ConfigurationPropertyCollection properties = base.Properties; + properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.PrefetchLimit, + typeof(int), 0, null, null, ConfigurationPropertyOptions.None)); properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.Shared, typeof(bool), false, null, null, ConfigurationPropertyOptions.None)); properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.TransferMode, @@ -112,6 +121,7 @@ this.BrokerPort = amqpBinding.BrokerPort; this.TransferMode = amqpBinding.TransferMode; this.Shared = amqpBinding.Shared; + this.PrefetchLimit = amqpBinding.PrefetchLimit; AmqpProperties props = amqpBinding.DefaultMessageProperties; } @@ -133,6 +143,7 @@ amqpBinding.BrokerPort = this.BrokerPort; amqpBinding.TransferMode = this.TransferMode; amqpBinding.Shared = this.Shared; + amqpBinding.PrefetchLimit = this.PrefetchLimit; } protected override void PostDeserialize() Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs?rev=818338&r1=818337&r2=818338&view=diff ============================================================================== --- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs (original) +++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs Thu Sep 24 01:24:28 2009 @@ -32,6 +32,7 @@ AmqpChannelProperties channelProperties; long maxBufferPoolSize; bool shared; + int prefetchLimit; internal AmqpChannelFactory(AmqpTransportBindingElement bindingElement, BindingContext context) : base(context.Binding) @@ -39,6 +40,7 @@ this.bindingElement = bindingElement; this.channelProperties = bindingElement.ChannelProperties.Clone(); this.shared = bindingElement.Shared; + this.prefetchLimit = bindingElement.PrefetchLimit; this.maxBufferPoolSize = bindingElement.MaxBufferPoolSize; Collection<MessageEncodingBindingElement> messageEncoderBindingElements = context.BindingParameters.FindAll<MessageEncodingBindingElement>(); @@ -91,7 +93,7 @@ protected override TChannel OnCreateChannel(EndpointAddress remoteAddress, Uri via) { - return (TChannel)(object) new AmqpTransportChannel(this, this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder, this.maxBufferPoolSize, this.shared); + return (TChannel)(object) new AmqpTransportChannel(this, this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder, this.maxBufferPoolSize, this.shared, this.prefetchLimit); } } Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs?rev=818338&r1=818337&r2=818338&view=diff ============================================================================== --- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs (original) +++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs Thu Sep 24 01:24:28 2009 @@ -46,6 +46,7 @@ public const string TransferMode = "transferMode"; public const string Brokers = "brokers"; public const string Shared = "shared"; + public const string PrefetchLimit = "prefetchLimit"; public const string MaxBufferPoolSize = "maxBufferPoolSize"; public const string MaxReceivedMessageSize = "maxReceivedMessageSize"; } @@ -55,7 +56,6 @@ internal const string BrokerHost = "localhost"; internal const int BrokerPort = 5672; internal const TransferMode TransferMode = System.ServiceModel.TransferMode.Buffered; - internal const byte Priority = 4; internal const long MaxBufferPoolSize = 64 * 1024; internal const int MaxReceivedMessageSize = 5 * 1024 * 1024; //64 * 1024; } Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs?rev=818338&r1=818337&r2=818338&view=diff ============================================================================== --- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs (original) +++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs Thu Sep 24 01:24:28 2009 @@ -32,6 +32,7 @@ AmqpTransportBindingElement bindingElement; AmqpChannelProperties channelProperties; bool shared; + int prefetchLimit; long maxBufferPoolSize; Uri uri; AmqpTransportChannel amqpTransportChannel; @@ -45,6 +46,7 @@ this.bindingElement = bindingElement; this.channelProperties = bindingElement.ChannelProperties.Clone(); this.shared = bindingElement.Shared; + this.prefetchLimit = bindingElement.PrefetchLimit; this.maxBufferPoolSize = bindingElement.MaxBufferPoolSize; @@ -132,7 +134,7 @@ { amqpTransportChannel = new AmqpTransportChannel(this, this.channelProperties, new EndpointAddress(uri), messageEncoderFactory.Encoder, - maxBufferPoolSize, this.shared); + maxBufferPoolSize, this.shared, this.prefetchLimit); return (IInputChannel)(object) amqpTransportChannel; } Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs?rev=818338&r1=818337&r2=818338&view=diff ============================================================================== --- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs (original) +++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs Thu Sep 24 01:24:28 2009 @@ -29,6 +29,7 @@ { AmqpChannelProperties channelProperties; bool shared; + int prefetchLimit; public AmqpTransportBindingElement() { @@ -41,6 +42,7 @@ { this.channelProperties = other.channelProperties.Clone(); this.shared = other.shared; + this.prefetchLimit = other.prefetchLimit; } public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context) @@ -98,6 +100,12 @@ set { this.channelProperties.BrokerPort = value; } } + public int PrefetchLimit + { + get { return this.prefetchLimit; } + set { this.prefetchLimit = value; } + } + public bool Shared { get { return this.shared; } Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs?rev=818338&r1=818337&r2=818338&view=diff ============================================================================== --- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs (original) +++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs Thu Sep 24 01:24:28 2009 @@ -50,6 +50,7 @@ private MessageEncoder encoder; private AmqpChannelProperties factoryChannelProperties; private bool shared; + private int prefetchLimit; private string encoderContentType; // input = 0-10 queue, output = 0-10 exchange @@ -68,7 +69,7 @@ private AsyncTimeSpanCaller asyncOpenCaller; private AsyncTimeSpanCaller asyncCloseCaller; - internal AmqpTransportChannel(ChannelManagerBase factory, AmqpChannelProperties channelProperties, EndpointAddress remoteAddress, MessageEncoder msgEncoder, long maxBufferPoolSize, bool sharedConnection) + internal AmqpTransportChannel(ChannelManagerBase factory, AmqpChannelProperties channelProperties, EndpointAddress remoteAddress, MessageEncoder msgEncoder, long maxBufferPoolSize, bool sharedConnection, int prefetchLimit) : base(factory) { this.isInputChannel = (factory is ChannelListenerBase) || (factory is AmqpChannelFactory<IInputChannel>); @@ -80,6 +81,7 @@ this.factoryChannelProperties = channelProperties; this.shared = sharedConnection; + this.prefetchLimit = prefetchLimit; this.remoteAddress = remoteAddress; // pull out host, port, queue, and connection arguments @@ -128,6 +130,7 @@ if (this.isInputChannel) { this.inputLink = ConnectionManager.GetInputLink(this.factoryChannelProperties, shared, false, this.queueName); + this.inputLink.PrefetchLimit = this.prefetchLimit; } else { @@ -287,7 +290,7 @@ return false; } - + public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state) { return this.inputLink.BeginTryReceive(timeout, callback, state); @@ -464,7 +467,7 @@ } return amqpMessage; } - + private Message QpidToWcf(AmqpMessage amqpMessage) { @@ -531,7 +534,7 @@ { this.bufferManager.ReturnBuffer(managedBuffer); } - } + } return wcfMessage; } Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp?rev=818338&r1=818337&r2=818338&view=diff ============================================================================== --- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp (original) +++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp Thu Sep 24 01:24:28 2009 @@ -63,11 +63,12 @@ sessionp = new qpid::client::AsyncSession; *sessionp = qpidConnectionp->newSession(); subs_mgrp = new SubscriptionManager (*sessionp); - success = true; waiters = gcnew Collections::Generic::List<CompletionWaiter^>(); + success = true; } finally { if (!success) { Cleanup(); + // TODO: include inner exception information throw gcnew QpidException ("session creation failure"); } } @@ -76,12 +77,6 @@ void AmqpSession::Cleanup() { - if (subscriptionp != NULL) { - subscriptionp->cancel(); - delete subscriptionp; - subscriptionp=NULL; - } - if (subs_mgrp != NULL) { subs_mgrp->stop(); delete subs_mgrp; @@ -112,6 +107,7 @@ void AmqpSession::ConnectionClosed() { + lock l(waiters); Cleanup(); } @@ -283,5 +279,27 @@ } } +bool AmqpSession::MessageStop(Completion &comp, std::string &name) +{ + lock l(waiters); + + if (sessionp == NULL) + return false; + + comp = sessionp->messageStop(name, true); + return true; +} + +void AmqpSession::AcceptAndComplete(SequenceSet& transfers) +{ + lock l(waiters); + + if (sessionp == NULL) + throw gcnew ObjectDisposedException("Accept"); + + sessionp->markCompleted(transfers, false); + sessionp->messageAccept(transfers, false); +} + }}} // namespace Apache::Qpid::Cli Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h?rev=818338&r1=818337&r2=818338&view=diff ============================================================================== --- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h (original) +++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h Thu Sep 24 01:24:28 2009 @@ -44,7 +44,6 @@ AsyncSession* sessionp; SessionImpl* sessionImplp; SubscriptionManager* subs_mgrp; - Subscription* subscriptionp; LocalQueue* localQueuep; Collections::Generic::List<CompletionWaiter^>^ waiters; bool helperRunning; @@ -69,6 +68,8 @@ void ConnectionClosed(); void internalWaitForCompletion(IntPtr Future); void removeWaiter(CompletionWaiter^ waiter); + bool MessageStop(Completion &comp, std::string &name); + void AcceptAndComplete(SequenceSet& transfers); property AmqpConnection^ Connection { AmqpConnection^ get () { return connection; } Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h?rev=818338&r1=818337&r2=818338&view=diff ============================================================================== --- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h (original) +++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h Thu Sep 24 01:24:28 2009 @@ -32,7 +32,6 @@ bool timedOut; // has an owner thread bool assigned; - // can Run (i.e. earlier CompletionWaiters in the queue have completed) System::Exception^ runException; AsyncCallback^ asyncCallback; Threading::Timer ^timer; Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp?rev=818338&r1=818337&r2=818338&view=diff ============================================================================== --- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp (original) +++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp Thu Sep 24 01:24:28 2009 @@ -59,6 +59,12 @@ // with proposed changes to the native library to reduce the number of servicing // threads for large numbers of subscriptions. +// synchronization is accomplished with locks, but also by ensuring that only one +// MessageWaiter (the one at the front of the line) is ever active. +// async threads to watch for: Close/finalizer, Timers, SyncCredit and the native Dispatch +// thread (who deposits FrameSets into the local queue and is oblivious to the +// managed space locks). + // The folowing def must match the "Frames" private typedef. // TODO, make Qpid-cpp "Frames" definition visible. @@ -94,6 +100,8 @@ localQueuep = new LocalQueue; SubscriptionSettings settings; settings.flowControl = FlowControl::messageCredit(0); + settings.completionMode = CompletionMode::MANUAL_COMPLETION; + Subscription sub = qpidSubsMgrp->subscribe(*localQueuep, qname, settings); subscriptionp = new Subscription (sub); // copy smart pointer for later IDisposable cleanup @@ -197,6 +205,7 @@ IntPtr InputLink::nextLocalMessage() { lock l(waiters); + if (disposed) return (IntPtr) NULL; @@ -279,8 +288,7 @@ if (haveMessage()) return true; - // TODO: prefetch window of messages, compatible with both 0-10 and 1.0. - subscriptionp->grantMessageCredit(1); + AdjustCredit(); // get a scoped smart ptr ref to guard against async close or hangup demuxQueuePtr = *queuePtrp; @@ -350,7 +358,12 @@ } return; } + waiters->RemoveAt(idx); + if (waiter->TimedOut) { + // may have to give back message if it arrives momentarily + AdjustCredit(); + } // let the next waiter know it's his turn. if (waiters->Count > 0) { @@ -411,6 +424,129 @@ } +void InputLink::PrefetchLimit::set(int value) +{ + lock l(waiters); + prefetchLimit = value; + + int delta = 0; + + // rough rule of thumb to keep the flow, but reduce chatter. + // for small messages, the credit request is almost as expensive as the transfer itself. + // experience may suggest a better heuristic or require a property for the low water mark + if (prefetchLimit >= 3) { + delta = prefetchLimit / 3; + } + minWorkingCredit = prefetchLimit - delta; + AdjustCredit(); +} + + +// call with lock held +void InputLink::AdjustCredit() +{ + if (creditSyncPending || disposed) + return; + + // low watermark check + if ((prefetchLimit != 0) && + (workingCredit >= minWorkingCredit) && + (workingCredit >= waiters->Count)) + return; + + // should have enough for all waiters or to satisfy the prefetch window + int targetCredit = waiters->Count; + if (targetCredit < prefetchLimit) + targetCredit = prefetchLimit; + + if (targetCredit > workingCredit) { + subscriptionp->grantMessageCredit(targetCredit - workingCredit); + workingCredit = targetCredit; + return; + } + if (targetCredit < workingCredit) { + if ((targetCredit == 0) && (prefetchLimit == 0)) { + creditSyncPending = true; + ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &InputLink::SyncCredit)); + } + // TODO: also shrink credit when prefetchLimit != 0 + } +} + +void InputLink::SyncCredit(Object ^unused) +{ + lock l(waiters); + + try { + if (disposed) + return; + + Completion comp; + if (!amqpSession->MessageStop(comp, subscriptionp->getName())) { + // connection closed + return; + } + + // get a private scoped copy to use outside the lock + Subscription s(*subscriptionp); + + l.release(); + // use setFlowControl to re-enable credit flow on the broker. + // previously used comp.wait() here, but setFlowControl is a sync operation + s.setFlowControl(s.getSettings().flowControl); + l.acquire(); + + if (disposed) + return; + + // let existing waiters use up any + // local queue size can only decrease until more credit is issued + while (true) { + if ((waiters->Count > 0) && ((*queuePtrp)->size() > 0)) { + l.release(); + // a rare use case and not used in performance oriented code. + // optimization can wait until the qpid/messaging api is used + Thread::Sleep(10); + l.acquire(); + if (disposed) + return; + } + else { + break; + } + } + + // At this point, the lock is held and we are fully synced with the broker + // so we have a valid snapshot + + if ((prefetchLimit == 0) && ((*queuePtrp)->size() > 0)) { + // can't be sure application will request a message again any time soon + QpidFrameSetPtr frameSetp; + while (!(*queuePtrp)->empty()) { + (*queuePtrp)->pop(frameSetp); + SequenceSet frameSetID(frameSetp->getId()); + subscriptionp->release(frameSetID); + } + + // don't touch dequeuedFrameSetpp. It is spoken for: explicitely from a + // MessageWaiter about to to get the nextLocalMessage(), or implicitely + // from a WaitForMessage(). + } + // TODO: if prefetchLimit != 0, release messages from back of the queue that exceed targetCredit + + workingCredit = (*queuePtrp)->size(); + if (dequeuedFrameSetpp != NULL) { + workingCredit++; + } + } + finally { + creditSyncPending = false; + } + + AdjustCredit(); +} + + AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp) { QpidFrameSetPtr* fspp = (QpidFrameSetPtr*) msgp.ToPointer(); @@ -539,7 +675,15 @@ // We have a message we can return to the caller. // Tell the broker we got it. - subscriptionp->accept(frameSetID); + + // subscriptionp->accept(frameSetID) is a slow sync operation in the native API + // so do it within the AsyncSession directly + amqpSession->AcceptAndComplete(frameSetID); + + workingCredit--; + // check if more messages need to be requested from broker + AdjustCredit(); + return amqpMessage; } finally { Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h?rev=818338&r1=818337&r2=818338&view=diff ============================================================================== --- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h (original) +++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h Thu Sep 24 01:24:28 2009 @@ -47,6 +47,14 @@ bool finalizing; QpidFrameSetPtr* dequeuedFrameSetpp; ManualResetEvent^ asyncHelperWaitHandle; + // number of messages to buffer locally for future consumption + int prefetchLimit; + // the number of messages requested and not yet processed + int workingCredit; + // stopping and restarting the message flow + bool creditSyncPending; + // working credit low water mark + int minWorkingCredit; void Cleanup(); void ReleaseNative(); @@ -54,6 +62,8 @@ void addWaiter(MessageWaiter^ waiter); void asyncHelper(); AmqpMessage^ createAmqpMessage(IntPtr msgp); + void AdjustCredit(); + void SyncCredit(Object ^); internal: InputLink(AmqpSession^ session, System::String^ sourceQueue, qpid::client::AsyncSession *qpidSessionp, @@ -80,6 +90,11 @@ IAsyncResult^ BeginWaitForMessage(TimeSpan timeout, AsyncCallback^ callback, Object^ state); bool EndWaitForMessage(IAsyncResult^ result); + property int PrefetchLimit { + int get () { return prefetchLimit; } + void set (int value); + } + }; }}} // namespace Apache::Qpid::Interop Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h?rev=818338&r1=818337&r2=818338&view=diff ============================================================================== --- qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h (original) +++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h Thu Sep 24 01:24:28 2009 @@ -66,7 +66,6 @@ void Activate(); void WaitForCompletion(); -// inline void SetCompletedSynchronously (bool v) { completedSynchronously = v; } property IntPtr Message { IntPtr get () { @@ -78,7 +77,6 @@ GC::SuppressFinalize(this); return v; } - // void set (IntPtr v) { message = v; } } property bool Assigned { @@ -89,11 +87,11 @@ bool get () { return timedOut; } } - property System::Exception^ RunException { System::Exception^ get() { return runException; } } + public: virtual property bool IsCompleted { --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org