Author: jgomes
Date: Tue Sep  9 10:08:09 2008
New Revision: 693516

URL: http://svn.apache.org/viewvc?rev=693516&view=rev
Log:
Apply patch from Chris Fraire.  Thanks, Chris!
Fixes [AMQNET-112]. (See https://issues.apache.org/activemq/browse/AMQNET-112)

Modified:
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/MutexTransport.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=693516&r1=693515&r2=693516&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
 Tue Sep  9 10:08:09 2008
@@ -20,6 +20,7 @@
 using Apache.NMS.Util;
 using System;
 using System.Collections;
+using System.Threading;
 
 namespace Apache.NMS.ActiveMQ
 {
@@ -35,18 +36,21 @@
                private TimeSpan requestTimeout;
                private BrokerInfo brokerInfo; // from broker
                private WireFormatInfo brokerWireFormatInfo; // from broker
-               private readonly IList sessions = ArrayList.Synchronized(new 
ArrayList());
+               private readonly IList sessions = new ArrayList();
+               /// <summary>
+               /// Private object used for synchronization, instead of public 
"this"
+               /// </summary>
+               private readonly object myLock = new object();
                private bool asyncSend = false;
                private bool asyncClose = true;
-               private bool connected = false;
-               private bool closed = false;
-               private bool closing = false;
+               private volatile bool closed;
+               private volatile bool triedConnect;
                private long sessionCounter = 0;
                private long temporaryDestinationCounter = 0;
                private long localTransactionCounter;
-               private readonly AtomicBoolean started = new 
AtomicBoolean(false);
+               private bool started = false;
                private bool disposed = false;
-               
+
                public Connection(Uri connectionUri, ITransport transport, 
ConnectionInfo info)
                {
                        this.brokerUri = connectionUri;
@@ -107,11 +111,15 @@
                public void Start()
                {
                        CheckConnected();
-                       if(started.CompareAndSet(false, true))
+                       lock(myLock)
                        {
-                               foreach(Session session in sessions)
+                               if(!started)
                                {
-                                       session.StartAsyncDelivery(null);
+                                       started = true;
+                                       foreach(Session session in sessions)
+                                       {
+                                               
session.StartAsyncDelivery(null);
+                                       }
                                }
                        }
                }
@@ -122,7 +130,7 @@
                /// </summary>
                public bool IsStarted
                {
-                       get { return started.Value; }
+                       get { return started; }
                }
 
                /// <summary>
@@ -132,15 +140,19 @@
                public void Stop()
                {
                        CheckConnected();
-                       if(started.CompareAndSet(true, false))
+                       lock(myLock)
                        {
-                               foreach(Session session in sessions)
+                               if(started)
                                {
-                                       session.StopAsyncDelivery();
+                                       started = false;
+                                       foreach(Session session in sessions)
+                                       {
+                                               session.StopAsyncDelivery();
+                                       }
                                }
                        }
                }
-               
+
                /// <summary>
                /// Creates a new session to work on this connection
                /// </summary>
@@ -162,20 +174,22 @@
                        System.Collections.Specialized.StringDictionary map = 
URISupport.ParseQuery(this.brokerUri.Query);
                        URISupport.SetProperties(session, map, "session.");
 
-                       if(IsStarted)
+                       lock(myLock)
                        {
-                               session.StartAsyncDelivery(null);
-                       }
+                               if(IsStarted)
+                               {
+                                       session.StartAsyncDelivery(null);
+                               }
 
-                       sessions.Add(session);
+                               sessions.Add(session);
+                       }
                        return session;
                }
 
                public void RemoveSession(Session session)
                {
                        DisposeOf(session.SessionId);
-
-                       if(!closing)
+                       lock(myLock)
                        {
                                sessions.Remove(session);
                        }
@@ -183,7 +197,26 @@
 
                public void Close()
                {
-                       lock(this)
+                       if(closed)
+                       {
+                               return;
+                       }
+
+                       //
+                       // Do a first-run close of sessions after brief 
synchronization
+                       //
+                       IList sessionsCopy;
+                       lock(myLock)
+                       {
+                               sessionsCopy = new ArrayList(sessions);
+                       }
+
+                       foreach(Session session in sessionsCopy)
+                       {
+                               session.Close();
+                       }
+
+                       lock(myLock)
                        {
                                if(closed)
                                {
@@ -192,8 +225,11 @@
 
                                try
                                {
-                                       closing = true;
-                                       foreach(Session session in sessions)
+                                       //
+                                       // Copy again for safe enumeration.  
Always assume
+                                       // that closing a session modifies the 
sessions list.
+                                       //
+                                       foreach(Session session in new 
ArrayList(sessions))
                                        {
                                                session.Close();
                                        }
@@ -207,10 +243,11 @@
                                {
                                        Tracer.ErrorFormat("Error during 
connection close: {0}", ex);
                                }
-
-                               transport = null;
-                               closed = true;
-                               closing = false;
+                               finally
+                               {
+                                       closed = true;
+                                       transport = null;
+                               }
                        }
                }
 
@@ -246,14 +283,14 @@
 
                        disposed = true;
                }
-               
+
                // Properties
 
                public Uri BrokerUri
                {
                        get { return brokerUri; }
                }
-               
+
                public ITransport ITransport
                {
                        get { return transport; }
@@ -271,41 +308,41 @@
                        get { return acknowledgementMode; }
                        set { this.acknowledgementMode = value; }
                }
-               
+
                public string ClientId
                {
                        get { return info.ClientId; }
                        set
                        {
-                               if(connected)
+                               if(triedConnect)
                                {
                                        throw new NMSException("You cannot 
change the ClientId once the Connection is connected");
                                }
                                info.ClientId = value;
                        }
                }
-               
+
                public ConnectionId ConnectionId
                {
                        get { return info.ConnectionId; }
                }
-               
+
                public BrokerInfo BrokerInfo
                {
                        get { return brokerInfo; }
                }
-               
+
                public WireFormatInfo BrokerWireFormat
                {
                        get { return brokerWireFormatInfo; }
                }
-               
+
                // Implementation methods
 
                /// <summary>
                /// Performs a synchronous request-response with the broker
                /// </summary>
-               /// 
+               ///
 
                public Response SyncRequest(Command command)
                {
@@ -360,12 +397,11 @@
                /// </summary>
                public String CreateTemporaryDestinationName()
                {
-                       lock(this)
-                       {
-                               return info.ConnectionId.Value + ":" + 
(++temporaryDestinationCounter);
-                       }
+                       return info.ConnectionId.Value
+                               + ":"
+                               + Interlocked.Increment(ref 
temporaryDestinationCounter);
                }
-               
+
                /// <summary>
                /// Creates a new local transaction ID
                /// </summary>
@@ -373,13 +409,10 @@
                {
                        LocalTransactionId id= new LocalTransactionId();
                        id.ConnectionId = ConnectionId;
-                       lock(this)
-                       {
-                               id.Value = (++localTransactionCounter);
-                       }
+                       id.Value = Interlocked.Increment(ref 
localTransactionCounter);
                        return id;
                }
-               
+
                protected void CheckConnected()
                {
                        if(closed)
@@ -387,19 +420,35 @@
                                throw new ConnectionClosedException();
                        }
 
-                       if(!connected)
+                       if(triedConnect)
                        {
-                               connected = true;
+                               return;
+                       }
+
+                       lock(myLock)
+                       {
+                               if(closed)
+                               {
+                                       throw new ConnectionClosedException();
+                               }
+
+                               if(triedConnect)
+                               {
+                                       return;
+                               }
+
+                               // Set this in advance, to short-circuit 
SyncRequest's call to this method
+                               triedConnect = true;
+
                                // now lets send the connection and see if we 
get an ack/nak
                                if(null == SyncRequest(info))
                                {
                                        closed = true;
-                                       connected = false;
                                        throw new ConnectionClosedException();
                                }
                        }
                }
-               
+
                /// <summary>
                /// Handle incoming commands
                /// </summary>
@@ -422,9 +471,12 @@
                        else if(command is ShutdownInfo)
                        {
                                //ShutdownInfo info = (ShutdownInfo)command;
-                               if(!closing && !closed)
+                               lock(myLock)
                                {
-                                       OnException(commandTransport, new 
NMSException("Broker closed this connection."));
+                                       if(!closed)
+                                       {
+                                               OnException(commandTransport, 
new NMSException("Broker closed this connection."));
+                                       }
                                }
                        }
                        else
@@ -437,12 +489,15 @@
                {
                        bool dispatched = false;
 
-                       foreach(Session session in sessions)
+                       lock(myLock)
                        {
-                               if(session.DispatchMessage(dispatch.ConsumerId, 
dispatch.Message))
+                               foreach(Session session in sessions)
                                {
-                                       dispatched = true;
-                                       break;
+                                       
if(session.DispatchMessage(dispatch.ConsumerId, dispatch.Message))
+                                       {
+                                               dispatched = true;
+                                               break;
+                                       }
                                }
                        }
 
@@ -483,19 +538,16 @@
                                }
                        }
                }
-               
+
                protected SessionInfo CreateSessionInfo(AcknowledgementMode 
sessionAcknowledgementMode)
                {
                        SessionInfo answer = new SessionInfo();
                        SessionId sessionId = new SessionId();
                        sessionId.ConnectionId = info.ConnectionId.Value;
-                       lock(this)
-                       {
-                               sessionId.Value = ++sessionCounter;
-                       }
+                       sessionId.Value = Interlocked.Increment(ref 
sessionCounter);
                        answer.SessionId = sessionId;
                        return answer;
                }
-               
+
        }
 }

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs?rev=693516&r1=693515&r2=693516&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs
 Tue Sep  9 10:08:09 2008
@@ -27,7 +27,7 @@
                public delegate void ExceptionHandler(Exception exception);
 
                private readonly AutoResetEvent m_event = new 
AutoResetEvent(false);
-               private bool m_bStopFlag = false;
+               private readonly ManualResetEvent m_stopEvent = new 
ManualResetEvent(false);
                private Thread m_thread = null;
                private readonly DispatchFunction m_dispatchFunc;
                private event ExceptionHandler m_exceptionListener;
@@ -37,7 +37,18 @@
                        m_dispatchFunc = dispatchFunc;
                }
 
-               // TODO can't use EventWaitHandle on MONO 1.0
+               public bool IsStarted
+               {
+                       get
+                       {
+                               lock(this)
+                               {
+                                       return (null != m_thread);
+                               }
+                       }
+               }
+
+                          // TODO can't use EventWaitHandle on MONO 1.0
                public AutoResetEvent EventHandle
                {
                        get { return m_event; }
@@ -49,7 +60,7 @@
                        {
                                m_exceptionListener += value;
                        }
-                       remove 
+                       remove
                        {
                                m_exceptionListener -= value;
                        }
@@ -61,10 +72,9 @@
                        {
                                if (m_thread == null)
                                {
-                                       m_bStopFlag = false;
+                                       m_stopEvent.Reset();
                                        m_thread = new Thread(new 
ThreadStart(MyThreadFunc));
                                        m_thread.IsBackground = true;
-                                       m_event.Set();
                                        Tracer.Info("Starting dispatcher thread 
for session");
                                        m_thread.Start();
                                }
@@ -76,20 +86,16 @@
                        Stop(System.Threading.Timeout.Infinite);
                }
 
-               
+
                internal void Stop(int timeoutMilliseconds)
                {
                        Tracer.Info("Stopping dispatcher thread for session");
-                       Thread localThread = null;
+                       Thread localThread;
                        lock (this)
                        {
                                localThread = m_thread;
                                m_thread = null;
-                               if (!m_bStopFlag)
-                               {
-                                       m_bStopFlag = true;
-                                       m_event.Set();
-                               }
+                               m_stopEvent.Set();
                        }
                        if(localThread!=null)
                        {
@@ -101,30 +107,54 @@
                        }
                        Tracer.Info("Dispatcher thread joined");
                }
-               
+
                private void MyThreadFunc()
                {
                        Tracer.Info("Dispatcher thread started");
-                       while (true) // loop forever (well, at least until 
we've been asked to stop)
-                       {
-                               lock (this)
-                               {
-                                       if (m_bStopFlag)
-                                               break;
-                               }
 
-                               try
-                               {
-                                       m_dispatchFunc();
-                               }
-                               catch (Exception ex)
-                               {
-                                       if (m_exceptionListener != null)
-                                               m_exceptionListener(ex);
+                       //
+                       // Put m_stopEvent first so it is preferred if both are 
signaled
+                       //
+                       WaitHandle[] signals = new WaitHandle[] {
+                               m_stopEvent,
+                               m_event
+                       };
+                       const int kStopEventOffset = 0;
+
+                       try
+                       {
+                               while (true) // loop forever (well, at least 
until we've been asked to stop)
+                               {
+                                       try
+                                       {
+                                               m_dispatchFunc();
+                                       }
+                                       catch(ThreadAbortException)
+                                       {
+                                               // Throw for handling down below
+                                               throw;
+                                       }
+                                       catch(Exception ex)
+                                       {
+                                               if(m_exceptionListener != null)
+                                               {
+                                                       m_exceptionListener(ex);
+                                               }
+                                       }
+
+                                       int sigOffset = 
WaitHandle.WaitAny(signals);
+                                       if(kStopEventOffset == sigOffset)
+                                       {
+                                               break;
+                                       }
+                                       // otherwise, continue the loop
                                }
-                               m_event.WaitOne();
+                               Tracer.Info("Dispatcher thread stopped");
+                       }
+                       catch(ThreadAbortException)
+                       {
+                               Tracer.Info("Dispatcher thread aborted");
                        }
-                       Tracer.Info("Dispatcher thread stopped");
                }
        }
-}
\ No newline at end of file
+}

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs?rev=693516&r1=693515&r2=693516&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
 Tue Sep  9 10:08:09 2008
@@ -144,11 +144,7 @@
                        {
                                MessageId id = new MessageId();
                                id.ProducerId = info.ProducerId;
-                               lock (this)
-                               {
-                                       id.ProducerSequenceId = 
++messageCounter;
-                               }
-
+                               id.ProducerSequenceId = 
Interlocked.Increment(ref messageCounter);
                                activeMessage.MessageId = id;
                        }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=693516&r1=693515&r2=693516&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs 
(original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs 
Tue Sep  9 10:08:09 2008
@@ -16,8 +16,10 @@
  */
 using System;
 using System.Collections;
+using System.Threading;
 using Apache.NMS.ActiveMQ.Commands;
 using Apache.NMS;
+using Apache.NMS.Util;
 
 namespace Apache.NMS.ActiveMQ
 {
@@ -27,15 +29,20 @@
        public class Session : ISession
        {
                private long consumerCounter;
-               private readonly IDictionary consumers = 
Hashtable.Synchronized(new Hashtable());
-               private readonly IDictionary producers = 
Hashtable.Synchronized(new Hashtable());
+               private readonly IDictionary consumers = new Hashtable();
+               private readonly IDictionary producers = new Hashtable();
                private readonly DispatchingThread dispatchingThread;
+               /// <summary>
+               /// Private object used for synchronization, instead of public 
"this"
+               /// </summary>
+               private readonly object myLock = new object();
                private DispatchingThread.ExceptionHandler 
dispatchingThread_ExceptionHandler;
                private readonly SessionInfo info;
                private long producerCounter;
-               internal bool startedAsyncDelivery = false;
                private bool disposed = false;
-        private bool closed = false;
+               private volatile bool closed = false;
+
+               private const int kMAX_STOP_ASYNC_MS = 30000;
 
                public Session(Connection connection, SessionInfo info, 
AcknowledgementMode acknowledgementMode)
                {
@@ -146,9 +153,37 @@
 
                public void Close()
                {
-                       lock(this)
+                       if(closed)
+                       {
+                               return;
+                       }
+
+                       //
+                       // Do a first-run close of consumer and producers after
+                       // brief synchronization
+                       //
+                       IList consumersCopy;
+                       IList producersCopy;
+
+                       lock(myLock)
                        {
-                               if(this.closed)
+                               consumersCopy = new ArrayList(consumers.Values);
+                               producersCopy = new ArrayList(producers.Values);
+                       }
+
+                       foreach(MessageConsumer consumer in consumersCopy)
+                       {
+                               consumer.Close();
+                       }
+
+                       foreach(MessageProducer producer in producersCopy)
+                       {
+                               producer.Close();
+                       }
+
+                       lock(myLock)
+                       {
+                               if(closed)
                                {
                                        return;
                                }
@@ -156,26 +191,30 @@
                                try
                                {
                                        StopAsyncDelivery();
-                                       Connection.RemoveSession(this);
-                                       foreach(MessageConsumer consumer in 
GetConsumers())
+                                       // Copy again for safe enumeration
+                                       foreach(MessageConsumer consumer in new 
ArrayList(consumers.Values))
                                        {
                                                consumer.Close();
                                        }
                                        consumers.Clear();
 
-                                       foreach(MessageProducer producer in 
GetProducers())
+                                       // Copy again for safe enumeration
+                                       foreach(MessageProducer producer in new 
ArrayList(producers.Values))
                                        {
                                                producer.Close();
                                        }
                                        producers.Clear();
+                                       Connection.RemoveSession(this);
                                }
-                               catch(Exception ex)
+                               catch (Exception ex)
                                {
                                        Tracer.ErrorFormat("Error during 
session close: {0}", ex);
                                }
-
-                               this.connection = null;
-                               this.closed = true;
+                               finally
+                               {
+                                       this.connection = null;
+                                       this.closed = true;
+                               }
                        }
                }
 
@@ -188,19 +227,24 @@
                {
                        ProducerInfo command = CreateProducerInfo(destination);
                        ProducerId producerId = command.ProducerId;
-                       MessageProducer producer = null;
+                       MessageProducer producer = new MessageProducer(this, 
command);
+                       lock(myLock)
+                       {
+                               producers[producerId] = producer;
+                       }
 
                        try
                        {
-                               producer = new MessageProducer(this, command);
-                               producers[producerId] = producer;
                                this.DoSend(command);
                        }
                        catch(Exception)
                        {
-                               if(producer != null)
+                               //
+                               // DoSend failed.  No need to call 
MessageProducer.Close
+                               //
+                               lock(myLock)
                                {
-                                       producer.Close();
+                                       producers.Remove(producerId);
                                }
 
                                throw;
@@ -228,23 +272,31 @@
                        ConsumerId consumerId = command.ConsumerId;
                        MessageConsumer consumer = null;
 
-                       try
+                       consumer = new MessageConsumer(this, command, 
this.AcknowledgementMode);
+                       // let's register the consumer first in case we start 
dispatching messages immediately
+                       lock(myLock)
                        {
-                               consumer = new MessageConsumer(this, command, 
this.AcknowledgementMode);
-                               // lets register the consumer first in case we 
start dispatching messages immediately
                                consumers[consumerId] = consumer;
+                       }
+
+                       try
+                       {
                                this.DoSend(command);
-                               return consumer;
                        }
                        catch(Exception)
                        {
-                               if(consumer != null)
+                               //
+                               // DoSend failed.  No need to call 
MessageProducer.Close
+                               //
+                               lock(myLock)
                                {
-                                       consumer.Close();
+                                       consumers.Remove(consumerId);
                                }
 
                                throw;
                        }
+
+                       return consumer;
                }
 
                public IMessageConsumer CreateDurableConsumer(ITopic 
destination, string name, string selector, bool noLocal)
@@ -255,18 +307,25 @@
                        command.NoLocal = noLocal;
                        MessageConsumer consumer = null;
 
-                       try
+                       consumer = new MessageConsumer(this, command, 
this.AcknowledgementMode);
+                       // let's register the consumer first in case we start 
dispatching messages immediately
+                       lock(myLock)
                        {
-                               consumer = new MessageConsumer(this, command, 
this.AcknowledgementMode);
-                               // lets register the consumer first in case we 
start dispatching messages immediately
                                consumers[consumerId] = consumer;
+                       }
+
+                       try
+                       {
                                this.DoSend(command);
                        }
                        catch(Exception)
                        {
-                               if(consumer != null)
+                               //
+                               // DoSend failed; no need to call 
MessageConsumer.Close
+                               //
+                               lock(myLock)
                                {
-                                       consumer.Close();
+                                       consumers.Remove(consumerId);
                                }
 
                                throw;
@@ -277,11 +336,15 @@
 
                public void DeleteDurableConsumer(string name)
                {
+                       IConnection conn = this.Connection;
                        RemoveSubscriptionInfo command = new 
RemoveSubscriptionInfo();
-                       command.ConnectionId = Connection.ConnectionId;
-                       command.ClientId = Connection.ClientId;
-                       command.SubcriptionName = name;
+                       if(null != conn)
+                       {
+                               command.ConnectionId = Connection.ConnectionId;
+                               command.ClientId = Connection.ClientId;
+                       }
 
+                       command.SubcriptionName = name;
                        this.DoSend(command);
                }
 
@@ -378,9 +441,12 @@
                        this.TransactionContext.Rollback();
 
                        // lets ensure all the consumers redeliver any rolled 
back messages
-                       foreach(MessageConsumer consumer in GetConsumers())
+                       lock(myLock)
                        {
-                               consumer.RedeliverRolledBackMessages();
+                               foreach(MessageConsumer consumer in 
consumers.Values)
+                               {
+                                       consumer.RedeliverRolledBackMessages();
+                               }
                        }
                }
 
@@ -472,19 +538,29 @@
                public void DisposeOf(ConsumerId objectId)
                {
                        Connection.DisposeOf(objectId);
-                       consumers.Remove(objectId);
+                       lock(myLock)
+                       {
+                               consumers.Remove(objectId);
+                       }
                }
 
                public void DisposeOf(ProducerId objectId)
                {
                        Connection.DisposeOf(objectId);
-                       producers.Remove(objectId);
+                       lock(myLock)
+                       {
+                               producers.Remove(objectId);
+                       }
                }
 
                public bool DispatchMessage(ConsumerId consumerId, Message 
message)
                {
                        bool dispatched = false;
-                       MessageConsumer consumer = (MessageConsumer) 
consumers[consumerId];
+                       MessageConsumer consumer;
+                       lock(myLock)
+                       {
+                               consumer = (MessageConsumer) 
consumers[consumerId];
+                       }
 
                        if(consumer != null)
                        {
@@ -503,9 +579,12 @@
                {
                        // lets iterate through each consumer created by this 
session
                        // ensuring that they have all pending messages 
dispatched
-                       foreach(MessageConsumer consumer in GetConsumers())
+                       lock(myLock)
                        {
-                               consumer.DispatchAsyncMessages();
+                               foreach(MessageConsumer consumer in 
consumers.Values)
+                               {
+                                       consumer.DispatchAsyncMessages();
+                               }
                        }
                }
 
@@ -515,7 +594,7 @@
                /// </summary>
                protected ICollection GetConsumers()
                {
-                       lock(consumers.SyncRoot)
+                       lock(myLock)
                        {
                                return new ArrayList(consumers.Values);
                        }
@@ -527,7 +606,7 @@
                /// </summary>
                protected ICollection GetProducers()
                {
-                       lock(producers.SyncRoot)
+                       lock(myLock)
                        {
                                return new ArrayList(producers.Values);
                        }
@@ -539,10 +618,7 @@
                        ConsumerId id = new ConsumerId();
                        id.ConnectionId = info.SessionId.ConnectionId;
                        id.SessionId = info.SessionId.Value;
-                       lock(this)
-                       {
-                               id.Value = ++consumerCounter;
-                       }
+                       id.Value = Interlocked.Increment(ref consumerCounter);
                        answer.ConsumerId = id;
                        answer.Destination = 
ActiveMQDestination.Transform(destination);
                        answer.Selector = selector;
@@ -557,7 +633,7 @@
                        ActiveMQDestination amqDestination = destination as 
ActiveMQDestination;
                        if(amqDestination != null && amqDestination.Options != 
null)
                        {
-                               Util.URISupport.SetProperties(answer, 
amqDestination.Options, "consumer.");
+                               URISupport.SetProperties(answer, 
amqDestination.Options, "consumer.");
                        }
 
                        return answer;
@@ -569,10 +645,7 @@
                        ProducerId id = new ProducerId();
                        id.ConnectionId = info.SessionId.ConnectionId;
                        id.SessionId = info.SessionId.Value;
-                       lock(this)
-                       {
-                               id.Value = ++producerCounter;
-                       }
+                       id.Value = Interlocked.Increment(ref producerCounter);
                        answer.ProducerId = id;
                        answer.Destination = 
ActiveMQDestination.Transform(destination);
 
@@ -581,7 +654,7 @@
                        ActiveMQDestination amqDestination = destination as 
ActiveMQDestination;
                        if(amqDestination != null && amqDestination.Options != 
null)
                        {
-                               Util.URISupport.SetProperties(answer, 
amqDestination.Options, "producer.");
+                               URISupport.SetProperties(answer, 
amqDestination.Options, "producer.");
                        }
 
                        return answer;
@@ -596,11 +669,10 @@
 
                internal void StopAsyncDelivery()
                {
-                       if(startedAsyncDelivery)
+                       if(dispatchingThread.IsStarted)
                        {
                                this.dispatchingThread.ExceptionListener -= 
this.dispatchingThread_ExceptionHandler;
-                               dispatchingThread.Stop(5000);
-                               startedAsyncDelivery = false;
+                               dispatchingThread.Stop(kMAX_STOP_ASYNC_MS);
                        }
                }
 
@@ -611,11 +683,10 @@
                                
dispatcher.SetAsyncDelivery(dispatchingThread.EventHandle);
                        }
 
-                       if(!startedAsyncDelivery)
+                       if(!dispatchingThread.IsStarted)
                        {
                                this.dispatchingThread.ExceptionListener += 
this.dispatchingThread_ExceptionHandler;
                                dispatchingThread.Start();
-                               startedAsyncDelivery = true;
                        }
                }
        }

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/MutexTransport.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/MutexTransport.cs?rev=693516&r1=693515&r2=693516&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/MutexTransport.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/MutexTransport.cs
 Tue Sep  9 10:08:09 2008
@@ -20,32 +20,32 @@
 
 namespace Apache.NMS.ActiveMQ.Transport
 {
-    /// <summary>
-    /// A Transport which guards access to the next transport using a mutex.
-    /// </summary>
-    public class MutexTransport : TransportFilter
-    {
-        private readonly object transmissionLock = new object();
+       /// <summary>
+       /// A Transport which guards access to the next transport using a mutex.
+       /// </summary>
+       public class MutexTransport : TransportFilter
+       {
+               private readonly object transmissionLock = new object();
 
-        public MutexTransport(ITransport next) : base(next)
+               public MutexTransport(ITransport next) : base(next)
                {
-        }
-        
-        public override void Oneway(Command command)
-        {
-            lock(transmissionLock)
-            {
-                this.next.Oneway(command);
-            }
-        }
+               }
 
-        public override FutureResponse AsyncRequest(Command command)
-        {
-            lock(transmissionLock)
-            {
-                return base.AsyncRequest(command);
-            }
-        }
+               public override void Oneway(Command command)
+               {
+                       lock(transmissionLock)
+                       {
+                               this.next.Oneway(command);
+                       }
+               }
+
+               public override FutureResponse AsyncRequest(Command command)
+               {
+                       lock(transmissionLock)
+                       {
+                               return base.AsyncRequest(command);
+                       }
+               }
 
                public override Response Request(Command command, TimeSpan 
timeout)
                {
@@ -54,13 +54,5 @@
                                return base.Request(command, timeout);
                        }
                }
-
-               public override void Dispose()
-        {
-            lock(transmissionLock)
-            {
-                base.Dispose();
-            }
-        }
-    }
+       }
 }

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=693516&r1=693515&r2=693516&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
 Tue Sep  9 10:08:09 2008
@@ -17,6 +17,7 @@
 using Apache.NMS.ActiveMQ.Commands;
 using Apache.NMS.ActiveMQ.OpenWire;
 using Apache.NMS.ActiveMQ.Transport;
+using Apache.NMS.Util;
 using System;
 using System.IO;
 using System.Net.Sockets;
@@ -24,23 +25,23 @@
 
 namespace Apache.NMS.ActiveMQ.Transport.Tcp
 {
-       
+
        /// <summary>
        /// An implementation of ITransport that uses sockets to communicate 
with the broker
        /// </summary>
        public class TcpTransport : ITransport
        {
-               private readonly object initLock = new object();
+               private readonly object myLock = new object();
                private readonly Socket socket;
                private IWireFormat wireformat;
                private BinaryReader socketReader;
                private BinaryWriter socketWriter;
-               private readonly object socketWriterLock = new object();
                private Thread readThread;
                private bool started;
-               private Util.AtomicBoolean closed = new 
Util.AtomicBoolean(false);
+               private volatile bool closed;
+               private volatile bool seenShutdown;
                private TimeSpan maxWait = 
TimeSpan.FromMilliseconds(Timeout.Infinite);
-               
+
                private CommandHandler commandHandler;
                private ExceptionHandler exceptionHandler;
                private TimeSpan MAX_THREAD_WAIT = 
TimeSpan.FromMilliseconds(30000);
@@ -51,13 +52,18 @@
                        this.socket = socket;
                        this.wireformat = wireformat;
                }
-               
+
+               ~TcpTransport()
+               {
+                       Dispose(false);
+               }
+
                /// <summary>
                /// Method Start
                /// </summary>
                public void Start()
                {
-                       lock (initLock)
+                       lock(myLock)
                        {
                                if (!started)
                                {
@@ -95,22 +101,27 @@
                {
                        get
                        {
-                               lock(initLock)
+                               lock(myLock)
                                {
                                        return started;
                                }
                        }
                }
-               
+
                public void Oneway(Command command)
                {
-                       lock (socketWriterLock)
+                       lock(myLock)
                        {
                                try
                                {
-                                       if(closed.Value)
+                                       if(closed)
+                                       {
+                                               throw new 
InvalidOperationException("Error writing to broker.  Transport connection is 
closed.");
+                                       }
+
+                                       if(command is ShutdownInfo)
                                        {
-                                               throw new Exception("Error 
writing to broker.  Transport connection is closed.");
+                                               seenShutdown = true;
                                        }
 
                                        Wireformat.Marshal(command, 
socketWriter);
@@ -134,7 +145,7 @@
                                }
                        }
                }
-               
+
                public FutureResponse AsyncRequest(Command command)
                {
                        throw new NotImplementedException("Use a 
ResponseCorrelator if you want to issue AsyncRequest calls");
@@ -169,13 +180,20 @@
                {
                        throw new NotImplementedException("Use a 
ResponseCorrelator if you want to issue Request calls");
                }
-               
+
                public void Close()
                {
-                       if(closed.CompareAndSet(false, true))
+                       if(!closed)
                        {
-                               lock(initLock)
+                               lock(myLock)
                                {
+                                       if(closed)
+                                       {
+                                               return;
+                                       }
+
+                                       closed = true;
+
                                        try
                                        {
                                                
socket.Shutdown(SocketShutdown.Both);
@@ -186,12 +204,9 @@
 
                                        try
                                        {
-                                               lock(socketWriterLock)
+                                               if(null != socketWriter)
                                                {
-                                                       if(null != socketWriter)
-                                                       {
-                                                               
socketWriter.Close();
-                                                       }
+                                                       socketWriter.Close();
                                                }
                                        }
                                        catch
@@ -260,9 +275,15 @@
 
                public void Dispose()
                {
+                       Dispose(true);
+                       GC.SuppressFinalize(this);
+               }
+
+               protected void Dispose(bool disposing)
+               {
                        Close();
                }
-               
+
                public void ReadLoop()
                {
                        // This is the thread function for the reader thread. 
This runs continuously
@@ -279,7 +300,7 @@
                        // An exception in the command handler may not be fatal 
to the transport, so
                        // these are simply reported to the exceptionHandler.
                        //
-                       while(!closed.Value)
+                       while(!closed)
                        {
                                Command command = null;
 
@@ -290,7 +311,7 @@
                                catch(Exception ex)
                                {
                                        command = null;
-                                       if(!closed.Value)
+                                       if(!closed && !seenShutdown)
                                        {
                                                // Close the socket as there's 
little that can be done with this transport now.
                                                Close();
@@ -313,9 +334,9 @@
                                }
                        }
                }
-                               
+
                // Implementation methods
-                               
+
                public CommandHandler Command
                {
                        get { return commandHandler; }

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs?rev=693516&r1=693515&r2=693516&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
 Tue Sep  9 10:08:09 2008
@@ -24,28 +24,33 @@
        /// Used to implement a filter on the transport layer.
        /// </summary>
        public class TransportFilter : ITransport
-    {
+       {
                protected readonly ITransport next;
                protected CommandHandler commandHandler;
                protected ExceptionHandler exceptionHandler;
-               
+
                public TransportFilter(ITransport next)
                {
                        this.next = next;
                        this.next.Command = new CommandHandler(OnCommand);
                        this.next.Exception = new ExceptionHandler(OnException);
                }
-               
+
+               ~TransportFilter()
+               {
+                       Dispose(false);
+               }
+
                protected virtual void OnCommand(ITransport sender, Command 
command)
                {
                        this.commandHandler(sender, command);
                }
-               
+
                protected virtual void OnException(ITransport sender, Exception 
command)
                {
                        this.exceptionHandler(sender, command);
                }
-               
+
                /// <summary>
                /// Method Oneway
                /// </summary>
@@ -54,7 +59,7 @@
                {
                        this.next.Oneway(command);
                }
-               
+
                /// <summary>
                /// Method AsyncRequest
                /// </summary>
@@ -124,23 +129,32 @@
                /// <summary>
                /// Method Dispose
                /// </summary>
-               public virtual void Dispose()
+               public void Dispose()
                {
-                       this.next.Dispose();
+                       Dispose(true);
+                       GC.SuppressFinalize(this);
                }
-               
+
+               protected virtual void Dispose(bool disposing)
+               {
+                       if(disposing)
+                       {
+                               this.next.Dispose();
+                       }
+               }
+
                public CommandHandler Command
                {
-            get { return commandHandler; }
-            set { this.commandHandler = value; }
-        }
-               
-        public  ExceptionHandler Exception
-               {
-            get { return exceptionHandler; }
-            set { this.exceptionHandler = value; }
-        }
-               
-    }
+                       get { return commandHandler; }
+                       set { this.commandHandler = value; }
+               }
+
+               public  ExceptionHandler Exception
+               {
+                       get { return exceptionHandler; }
+                       set { this.exceptionHandler = value; }
+               }
+
+       }
 }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs?rev=693516&r1=693515&r2=693516&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs
 Tue Sep  9 10:08:09 2008
@@ -24,85 +24,84 @@
 
 namespace Apache.NMS.ActiveMQ.Transport
 {
-       
-    /// <summary>
-    /// A Transport which negotiates the wire format
-    /// </summary>
-    public class WireFormatNegotiator : TransportFilter
-    {
-        private OpenWireFormat wireFormat;
-        private TimeSpan negotiateTimeout = TimeSpan.FromSeconds(15);
-    
-        private AtomicBoolean firstStart=new AtomicBoolean(true);
-        private CountDownLatch readyCountDownLatch = new CountDownLatch(1);
-        private CountDownLatch wireInfoSentDownLatch = new CountDownLatch(1);
-
-        public WireFormatNegotiator(ITransport next, OpenWireFormat wireFormat)
-            : base(next)
-        {
-            this.wireFormat = wireFormat;
-        }
-        
-        public override void Start()
-        {
-            base.Start();
-            if (firstStart.CompareAndSet(true, false))
-            {
-                try
-                {
-                    next.Oneway(wireFormat.PreferedWireFormatInfo);
-                }
-                finally
-                {
-                    wireInfoSentDownLatch.countDown();
-                }
-            }
-        }
-        
-        public override void Dispose()
-        {
-               base.Dispose();
-            readyCountDownLatch.countDown();
-        }
-
-        public override void Oneway(Command command)
-        {
-            if (!readyCountDownLatch.await(negotiateTimeout))
-                throw new IOException("Wire format negotiation timeout: peer 
did not send his wire format.");
-            next.Oneway(command);
-        }
-
-        protected override void OnCommand(ITransport sender, Command command)
-        {
-            if ( command.GetDataStructureType() == 
WireFormatInfo.ID_WireFormatInfo )
-            {
-                WireFormatInfo info = (WireFormatInfo)command;
-                try
-                {
-                    if (!info.Valid)
-                    {
-                        throw new IOException("Remote wire format magic is 
invalid");
-                    }
-                    wireInfoSentDownLatch.await(negotiateTimeout);
-                    wireFormat.renegotiateWireFormat(info);
-                }
-                catch (Exception e)
-                {
-                    OnException(this, e);
-                } 
-                finally
-                {
-                    readyCountDownLatch.countDown();
-                }
-            }
-            this.commandHandler(sender, command);
-        }
-
-        protected override void OnException(ITransport sender, Exception 
command)
-        {
-            readyCountDownLatch.countDown();
-            this.exceptionHandler(sender, command);
-        }
-    }
+       /// <summary>
+       /// A Transport which negotiates the wire format
+       /// </summary>
+       public class WireFormatNegotiator : TransportFilter
+       {
+               private OpenWireFormat wireFormat;
+               private TimeSpan negotiateTimeout = TimeSpan.FromSeconds(15);
+
+               private AtomicBoolean firstStart=new AtomicBoolean(true);
+               private CountDownLatch readyCountDownLatch = new 
CountDownLatch(1);
+               private CountDownLatch wireInfoSentDownLatch = new 
CountDownLatch(1);
+
+               public WireFormatNegotiator(ITransport next, OpenWireFormat 
wireFormat)
+                       : base(next)
+               {
+                       this.wireFormat = wireFormat;
+               }
+
+               public override void Start()
+               {
+                       base.Start();
+                       if (firstStart.CompareAndSet(true, false))
+                       {
+                               try
+                               {
+                                       
next.Oneway(wireFormat.PreferedWireFormatInfo);
+                               }
+                               finally
+                               {
+                                       wireInfoSentDownLatch.countDown();
+                               }
+                       }
+               }
+
+               protected override void Dispose(bool disposing)
+               {
+                       base.Dispose(disposing);
+                       readyCountDownLatch.countDown();
+               }
+
+               public override void Oneway(Command command)
+               {
+                       if (!readyCountDownLatch.await(negotiateTimeout))
+                               throw new IOException("Wire format negotiation 
timeout: peer did not send his wire format.");
+                       next.Oneway(command);
+               }
+
+               protected override void OnCommand(ITransport sender, Command 
command)
+               {
+                       if ( command.GetDataStructureType() == 
WireFormatInfo.ID_WireFormatInfo )
+                       {
+                               WireFormatInfo info = (WireFormatInfo)command;
+                               try
+                               {
+                                       if (!info.Valid)
+                                       {
+                                               throw new IOException("Remote 
wire format magic is invalid");
+                                       }
+                                       
wireInfoSentDownLatch.await(negotiateTimeout);
+                                       wireFormat.renegotiateWireFormat(info);
+                               }
+                               catch (Exception e)
+                               {
+                                       OnException(this, e);
+                               }
+                               finally
+                               {
+                                       readyCountDownLatch.countDown();
+                               }
+                       }
+                       this.commandHandler(sender, command);
+               }
+
+               protected override void OnException(ITransport sender, 
Exception command)
+               {
+                       readyCountDownLatch.countDown();
+                       this.exceptionHandler(sender, command);
+               }
+       }
 }
 


Reply via email to