Author: jgomes
Date: Tue Sep 9 10:08:09 2008
New Revision: 693516
URL: http://svn.apache.org/viewvc?rev=693516&view=rev
Log:
Apply patch from Chris Fraire. Thanks, Chris!
Fixes [AMQNET-112]. (See https://issues.apache.org/activemq/browse/AMQNET-112)
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/MutexTransport.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=693516&r1=693515&r2=693516&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
Tue Sep 9 10:08:09 2008
@@ -20,6 +20,7 @@
using Apache.NMS.Util;
using System;
using System.Collections;
+using System.Threading;
namespace Apache.NMS.ActiveMQ
{
@@ -35,18 +36,21 @@
private TimeSpan requestTimeout;
private BrokerInfo brokerInfo; // from broker
private WireFormatInfo brokerWireFormatInfo; // from broker
- private readonly IList sessions = ArrayList.Synchronized(new
ArrayList());
+ private readonly IList sessions = new ArrayList();
+ /// <summary>
+ /// Private object used for synchronization, instead of public
"this"
+ /// </summary>
+ private readonly object myLock = new object();
private bool asyncSend = false;
private bool asyncClose = true;
- private bool connected = false;
- private bool closed = false;
- private bool closing = false;
+ private volatile bool closed;
+ private volatile bool triedConnect;
private long sessionCounter = 0;
private long temporaryDestinationCounter = 0;
private long localTransactionCounter;
- private readonly AtomicBoolean started = new
AtomicBoolean(false);
+ private bool started = false;
private bool disposed = false;
-
+
public Connection(Uri connectionUri, ITransport transport,
ConnectionInfo info)
{
this.brokerUri = connectionUri;
@@ -107,11 +111,15 @@
public void Start()
{
CheckConnected();
- if(started.CompareAndSet(false, true))
+ lock(myLock)
{
- foreach(Session session in sessions)
+ if(!started)
{
- session.StartAsyncDelivery(null);
+ started = true;
+ foreach(Session session in sessions)
+ {
+
session.StartAsyncDelivery(null);
+ }
}
}
}
@@ -122,7 +130,7 @@
/// </summary>
public bool IsStarted
{
- get { return started.Value; }
+ get { return started; }
}
/// <summary>
@@ -132,15 +140,19 @@
public void Stop()
{
CheckConnected();
- if(started.CompareAndSet(true, false))
+ lock(myLock)
{
- foreach(Session session in sessions)
+ if(started)
{
- session.StopAsyncDelivery();
+ started = false;
+ foreach(Session session in sessions)
+ {
+ session.StopAsyncDelivery();
+ }
}
}
}
-
+
/// <summary>
/// Creates a new session to work on this connection
/// </summary>
@@ -162,20 +174,22 @@
System.Collections.Specialized.StringDictionary map =
URISupport.ParseQuery(this.brokerUri.Query);
URISupport.SetProperties(session, map, "session.");
- if(IsStarted)
+ lock(myLock)
{
- session.StartAsyncDelivery(null);
- }
+ if(IsStarted)
+ {
+ session.StartAsyncDelivery(null);
+ }
- sessions.Add(session);
+ sessions.Add(session);
+ }
return session;
}
public void RemoveSession(Session session)
{
DisposeOf(session.SessionId);
-
- if(!closing)
+ lock(myLock)
{
sessions.Remove(session);
}
@@ -183,7 +197,26 @@
public void Close()
{
- lock(this)
+ if(closed)
+ {
+ return;
+ }
+
+ //
+ // Do a first-run close of sessions after brief
synchronization
+ //
+ IList sessionsCopy;
+ lock(myLock)
+ {
+ sessionsCopy = new ArrayList(sessions);
+ }
+
+ foreach(Session session in sessionsCopy)
+ {
+ session.Close();
+ }
+
+ lock(myLock)
{
if(closed)
{
@@ -192,8 +225,11 @@
try
{
- closing = true;
- foreach(Session session in sessions)
+ //
+ // Copy again for safe enumeration.
Always assume
+ // that closing a session modifies the
sessions list.
+ //
+ foreach(Session session in new
ArrayList(sessions))
{
session.Close();
}
@@ -207,10 +243,11 @@
{
Tracer.ErrorFormat("Error during
connection close: {0}", ex);
}
-
- transport = null;
- closed = true;
- closing = false;
+ finally
+ {
+ closed = true;
+ transport = null;
+ }
}
}
@@ -246,14 +283,14 @@
disposed = true;
}
-
+
// Properties
public Uri BrokerUri
{
get { return brokerUri; }
}
-
+
public ITransport ITransport
{
get { return transport; }
@@ -271,41 +308,41 @@
get { return acknowledgementMode; }
set { this.acknowledgementMode = value; }
}
-
+
public string ClientId
{
get { return info.ClientId; }
set
{
- if(connected)
+ if(triedConnect)
{
throw new NMSException("You cannot
change the ClientId once the Connection is connected");
}
info.ClientId = value;
}
}
-
+
public ConnectionId ConnectionId
{
get { return info.ConnectionId; }
}
-
+
public BrokerInfo BrokerInfo
{
get { return brokerInfo; }
}
-
+
public WireFormatInfo BrokerWireFormat
{
get { return brokerWireFormatInfo; }
}
-
+
// Implementation methods
/// <summary>
/// Performs a synchronous request-response with the broker
/// </summary>
- ///
+ ///
public Response SyncRequest(Command command)
{
@@ -360,12 +397,11 @@
/// </summary>
public String CreateTemporaryDestinationName()
{
- lock(this)
- {
- return info.ConnectionId.Value + ":" +
(++temporaryDestinationCounter);
- }
+ return info.ConnectionId.Value
+ + ":"
+ + Interlocked.Increment(ref
temporaryDestinationCounter);
}
-
+
/// <summary>
/// Creates a new local transaction ID
/// </summary>
@@ -373,13 +409,10 @@
{
LocalTransactionId id= new LocalTransactionId();
id.ConnectionId = ConnectionId;
- lock(this)
- {
- id.Value = (++localTransactionCounter);
- }
+ id.Value = Interlocked.Increment(ref
localTransactionCounter);
return id;
}
-
+
protected void CheckConnected()
{
if(closed)
@@ -387,19 +420,35 @@
throw new ConnectionClosedException();
}
- if(!connected)
+ if(triedConnect)
{
- connected = true;
+ return;
+ }
+
+ lock(myLock)
+ {
+ if(closed)
+ {
+ throw new ConnectionClosedException();
+ }
+
+ if(triedConnect)
+ {
+ return;
+ }
+
+ // Set this in advance, to short-circuit
SyncRequest's call to this method
+ triedConnect = true;
+
// now lets send the connection and see if we
get an ack/nak
if(null == SyncRequest(info))
{
closed = true;
- connected = false;
throw new ConnectionClosedException();
}
}
}
-
+
/// <summary>
/// Handle incoming commands
/// </summary>
@@ -422,9 +471,12 @@
else if(command is ShutdownInfo)
{
//ShutdownInfo info = (ShutdownInfo)command;
- if(!closing && !closed)
+ lock(myLock)
{
- OnException(commandTransport, new
NMSException("Broker closed this connection."));
+ if(!closed)
+ {
+ OnException(commandTransport,
new NMSException("Broker closed this connection."));
+ }
}
}
else
@@ -437,12 +489,15 @@
{
bool dispatched = false;
- foreach(Session session in sessions)
+ lock(myLock)
{
- if(session.DispatchMessage(dispatch.ConsumerId,
dispatch.Message))
+ foreach(Session session in sessions)
{
- dispatched = true;
- break;
+
if(session.DispatchMessage(dispatch.ConsumerId, dispatch.Message))
+ {
+ dispatched = true;
+ break;
+ }
}
}
@@ -483,19 +538,16 @@
}
}
}
-
+
protected SessionInfo CreateSessionInfo(AcknowledgementMode
sessionAcknowledgementMode)
{
SessionInfo answer = new SessionInfo();
SessionId sessionId = new SessionId();
sessionId.ConnectionId = info.ConnectionId.Value;
- lock(this)
- {
- sessionId.Value = ++sessionCounter;
- }
+ sessionId.Value = Interlocked.Increment(ref
sessionCounter);
answer.SessionId = sessionId;
return answer;
}
-
+
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs?rev=693516&r1=693515&r2=693516&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs
Tue Sep 9 10:08:09 2008
@@ -27,7 +27,7 @@
public delegate void ExceptionHandler(Exception exception);
private readonly AutoResetEvent m_event = new
AutoResetEvent(false);
- private bool m_bStopFlag = false;
+ private readonly ManualResetEvent m_stopEvent = new
ManualResetEvent(false);
private Thread m_thread = null;
private readonly DispatchFunction m_dispatchFunc;
private event ExceptionHandler m_exceptionListener;
@@ -37,7 +37,18 @@
m_dispatchFunc = dispatchFunc;
}
- // TODO can't use EventWaitHandle on MONO 1.0
+ public bool IsStarted
+ {
+ get
+ {
+ lock(this)
+ {
+ return (null != m_thread);
+ }
+ }
+ }
+
+ // TODO can't use EventWaitHandle on MONO 1.0
public AutoResetEvent EventHandle
{
get { return m_event; }
@@ -49,7 +60,7 @@
{
m_exceptionListener += value;
}
- remove
+ remove
{
m_exceptionListener -= value;
}
@@ -61,10 +72,9 @@
{
if (m_thread == null)
{
- m_bStopFlag = false;
+ m_stopEvent.Reset();
m_thread = new Thread(new
ThreadStart(MyThreadFunc));
m_thread.IsBackground = true;
- m_event.Set();
Tracer.Info("Starting dispatcher thread
for session");
m_thread.Start();
}
@@ -76,20 +86,16 @@
Stop(System.Threading.Timeout.Infinite);
}
-
+
internal void Stop(int timeoutMilliseconds)
{
Tracer.Info("Stopping dispatcher thread for session");
- Thread localThread = null;
+ Thread localThread;
lock (this)
{
localThread = m_thread;
m_thread = null;
- if (!m_bStopFlag)
- {
- m_bStopFlag = true;
- m_event.Set();
- }
+ m_stopEvent.Set();
}
if(localThread!=null)
{
@@ -101,30 +107,54 @@
}
Tracer.Info("Dispatcher thread joined");
}
-
+
private void MyThreadFunc()
{
Tracer.Info("Dispatcher thread started");
- while (true) // loop forever (well, at least until
we've been asked to stop)
- {
- lock (this)
- {
- if (m_bStopFlag)
- break;
- }
- try
- {
- m_dispatchFunc();
- }
- catch (Exception ex)
- {
- if (m_exceptionListener != null)
- m_exceptionListener(ex);
+ //
+ // Put m_stopEvent first so it is preferred if both are
signaled
+ //
+ WaitHandle[] signals = new WaitHandle[] {
+ m_stopEvent,
+ m_event
+ };
+ const int kStopEventOffset = 0;
+
+ try
+ {
+ while (true) // loop forever (well, at least
until we've been asked to stop)
+ {
+ try
+ {
+ m_dispatchFunc();
+ }
+ catch(ThreadAbortException)
+ {
+ // Throw for handling down below
+ throw;
+ }
+ catch(Exception ex)
+ {
+ if(m_exceptionListener != null)
+ {
+ m_exceptionListener(ex);
+ }
+ }
+
+ int sigOffset =
WaitHandle.WaitAny(signals);
+ if(kStopEventOffset == sigOffset)
+ {
+ break;
+ }
+ // otherwise, continue the loop
}
- m_event.WaitOne();
+ Tracer.Info("Dispatcher thread stopped");
+ }
+ catch(ThreadAbortException)
+ {
+ Tracer.Info("Dispatcher thread aborted");
}
- Tracer.Info("Dispatcher thread stopped");
}
}
-}
\ No newline at end of file
+}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs?rev=693516&r1=693515&r2=693516&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
Tue Sep 9 10:08:09 2008
@@ -144,11 +144,7 @@
{
MessageId id = new MessageId();
id.ProducerId = info.ProducerId;
- lock (this)
- {
- id.ProducerSequenceId =
++messageCounter;
- }
-
+ id.ProducerSequenceId =
Interlocked.Increment(ref messageCounter);
activeMessage.MessageId = id;
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=693516&r1=693515&r2=693516&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
Tue Sep 9 10:08:09 2008
@@ -16,8 +16,10 @@
*/
using System;
using System.Collections;
+using System.Threading;
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS;
+using Apache.NMS.Util;
namespace Apache.NMS.ActiveMQ
{
@@ -27,15 +29,20 @@
public class Session : ISession
{
private long consumerCounter;
- private readonly IDictionary consumers =
Hashtable.Synchronized(new Hashtable());
- private readonly IDictionary producers =
Hashtable.Synchronized(new Hashtable());
+ private readonly IDictionary consumers = new Hashtable();
+ private readonly IDictionary producers = new Hashtable();
private readonly DispatchingThread dispatchingThread;
+ /// <summary>
+ /// Private object used for synchronization, instead of public
"this"
+ /// </summary>
+ private readonly object myLock = new object();
private DispatchingThread.ExceptionHandler
dispatchingThread_ExceptionHandler;
private readonly SessionInfo info;
private long producerCounter;
- internal bool startedAsyncDelivery = false;
private bool disposed = false;
- private bool closed = false;
+ private volatile bool closed = false;
+
+ private const int kMAX_STOP_ASYNC_MS = 30000;
public Session(Connection connection, SessionInfo info,
AcknowledgementMode acknowledgementMode)
{
@@ -146,9 +153,37 @@
public void Close()
{
- lock(this)
+ if(closed)
+ {
+ return;
+ }
+
+ //
+ // Do a first-run close of consumer and producers after
+ // brief synchronization
+ //
+ IList consumersCopy;
+ IList producersCopy;
+
+ lock(myLock)
{
- if(this.closed)
+ consumersCopy = new ArrayList(consumers.Values);
+ producersCopy = new ArrayList(producers.Values);
+ }
+
+ foreach(MessageConsumer consumer in consumersCopy)
+ {
+ consumer.Close();
+ }
+
+ foreach(MessageProducer producer in producersCopy)
+ {
+ producer.Close();
+ }
+
+ lock(myLock)
+ {
+ if(closed)
{
return;
}
@@ -156,26 +191,30 @@
try
{
StopAsyncDelivery();
- Connection.RemoveSession(this);
- foreach(MessageConsumer consumer in
GetConsumers())
+ // Copy again for safe enumeration
+ foreach(MessageConsumer consumer in new
ArrayList(consumers.Values))
{
consumer.Close();
}
consumers.Clear();
- foreach(MessageProducer producer in
GetProducers())
+ // Copy again for safe enumeration
+ foreach(MessageProducer producer in new
ArrayList(producers.Values))
{
producer.Close();
}
producers.Clear();
+ Connection.RemoveSession(this);
}
- catch(Exception ex)
+ catch (Exception ex)
{
Tracer.ErrorFormat("Error during
session close: {0}", ex);
}
-
- this.connection = null;
- this.closed = true;
+ finally
+ {
+ this.connection = null;
+ this.closed = true;
+ }
}
}
@@ -188,19 +227,24 @@
{
ProducerInfo command = CreateProducerInfo(destination);
ProducerId producerId = command.ProducerId;
- MessageProducer producer = null;
+ MessageProducer producer = new MessageProducer(this,
command);
+ lock(myLock)
+ {
+ producers[producerId] = producer;
+ }
try
{
- producer = new MessageProducer(this, command);
- producers[producerId] = producer;
this.DoSend(command);
}
catch(Exception)
{
- if(producer != null)
+ //
+ // DoSend failed. No need to call
MessageProducer.Close
+ //
+ lock(myLock)
{
- producer.Close();
+ producers.Remove(producerId);
}
throw;
@@ -228,23 +272,31 @@
ConsumerId consumerId = command.ConsumerId;
MessageConsumer consumer = null;
- try
+ consumer = new MessageConsumer(this, command,
this.AcknowledgementMode);
+ // let's register the consumer first in case we start
dispatching messages immediately
+ lock(myLock)
{
- consumer = new MessageConsumer(this, command,
this.AcknowledgementMode);
- // lets register the consumer first in case we
start dispatching messages immediately
consumers[consumerId] = consumer;
+ }
+
+ try
+ {
this.DoSend(command);
- return consumer;
}
catch(Exception)
{
- if(consumer != null)
+ //
+ // DoSend failed. No need to call
MessageProducer.Close
+ //
+ lock(myLock)
{
- consumer.Close();
+ consumers.Remove(consumerId);
}
throw;
}
+
+ return consumer;
}
public IMessageConsumer CreateDurableConsumer(ITopic
destination, string name, string selector, bool noLocal)
@@ -255,18 +307,25 @@
command.NoLocal = noLocal;
MessageConsumer consumer = null;
- try
+ consumer = new MessageConsumer(this, command,
this.AcknowledgementMode);
+ // let's register the consumer first in case we start
dispatching messages immediately
+ lock(myLock)
{
- consumer = new MessageConsumer(this, command,
this.AcknowledgementMode);
- // lets register the consumer first in case we
start dispatching messages immediately
consumers[consumerId] = consumer;
+ }
+
+ try
+ {
this.DoSend(command);
}
catch(Exception)
{
- if(consumer != null)
+ //
+ // DoSend failed; no need to call
MessageConsumer.Close
+ //
+ lock(myLock)
{
- consumer.Close();
+ consumers.Remove(consumerId);
}
throw;
@@ -277,11 +336,15 @@
public void DeleteDurableConsumer(string name)
{
+ IConnection conn = this.Connection;
RemoveSubscriptionInfo command = new
RemoveSubscriptionInfo();
- command.ConnectionId = Connection.ConnectionId;
- command.ClientId = Connection.ClientId;
- command.SubcriptionName = name;
+ if(null != conn)
+ {
+ command.ConnectionId = Connection.ConnectionId;
+ command.ClientId = Connection.ClientId;
+ }
+ command.SubcriptionName = name;
this.DoSend(command);
}
@@ -378,9 +441,12 @@
this.TransactionContext.Rollback();
// lets ensure all the consumers redeliver any rolled
back messages
- foreach(MessageConsumer consumer in GetConsumers())
+ lock(myLock)
{
- consumer.RedeliverRolledBackMessages();
+ foreach(MessageConsumer consumer in
consumers.Values)
+ {
+ consumer.RedeliverRolledBackMessages();
+ }
}
}
@@ -472,19 +538,29 @@
public void DisposeOf(ConsumerId objectId)
{
Connection.DisposeOf(objectId);
- consumers.Remove(objectId);
+ lock(myLock)
+ {
+ consumers.Remove(objectId);
+ }
}
public void DisposeOf(ProducerId objectId)
{
Connection.DisposeOf(objectId);
- producers.Remove(objectId);
+ lock(myLock)
+ {
+ producers.Remove(objectId);
+ }
}
public bool DispatchMessage(ConsumerId consumerId, Message
message)
{
bool dispatched = false;
- MessageConsumer consumer = (MessageConsumer)
consumers[consumerId];
+ MessageConsumer consumer;
+ lock(myLock)
+ {
+ consumer = (MessageConsumer)
consumers[consumerId];
+ }
if(consumer != null)
{
@@ -503,9 +579,12 @@
{
// lets iterate through each consumer created by this
session
// ensuring that they have all pending messages
dispatched
- foreach(MessageConsumer consumer in GetConsumers())
+ lock(myLock)
{
- consumer.DispatchAsyncMessages();
+ foreach(MessageConsumer consumer in
consumers.Values)
+ {
+ consumer.DispatchAsyncMessages();
+ }
}
}
@@ -515,7 +594,7 @@
/// </summary>
protected ICollection GetConsumers()
{
- lock(consumers.SyncRoot)
+ lock(myLock)
{
return new ArrayList(consumers.Values);
}
@@ -527,7 +606,7 @@
/// </summary>
protected ICollection GetProducers()
{
- lock(producers.SyncRoot)
+ lock(myLock)
{
return new ArrayList(producers.Values);
}
@@ -539,10 +618,7 @@
ConsumerId id = new ConsumerId();
id.ConnectionId = info.SessionId.ConnectionId;
id.SessionId = info.SessionId.Value;
- lock(this)
- {
- id.Value = ++consumerCounter;
- }
+ id.Value = Interlocked.Increment(ref consumerCounter);
answer.ConsumerId = id;
answer.Destination =
ActiveMQDestination.Transform(destination);
answer.Selector = selector;
@@ -557,7 +633,7 @@
ActiveMQDestination amqDestination = destination as
ActiveMQDestination;
if(amqDestination != null && amqDestination.Options !=
null)
{
- Util.URISupport.SetProperties(answer,
amqDestination.Options, "consumer.");
+ URISupport.SetProperties(answer,
amqDestination.Options, "consumer.");
}
return answer;
@@ -569,10 +645,7 @@
ProducerId id = new ProducerId();
id.ConnectionId = info.SessionId.ConnectionId;
id.SessionId = info.SessionId.Value;
- lock(this)
- {
- id.Value = ++producerCounter;
- }
+ id.Value = Interlocked.Increment(ref producerCounter);
answer.ProducerId = id;
answer.Destination =
ActiveMQDestination.Transform(destination);
@@ -581,7 +654,7 @@
ActiveMQDestination amqDestination = destination as
ActiveMQDestination;
if(amqDestination != null && amqDestination.Options !=
null)
{
- Util.URISupport.SetProperties(answer,
amqDestination.Options, "producer.");
+ URISupport.SetProperties(answer,
amqDestination.Options, "producer.");
}
return answer;
@@ -596,11 +669,10 @@
internal void StopAsyncDelivery()
{
- if(startedAsyncDelivery)
+ if(dispatchingThread.IsStarted)
{
this.dispatchingThread.ExceptionListener -=
this.dispatchingThread_ExceptionHandler;
- dispatchingThread.Stop(5000);
- startedAsyncDelivery = false;
+ dispatchingThread.Stop(kMAX_STOP_ASYNC_MS);
}
}
@@ -611,11 +683,10 @@
dispatcher.SetAsyncDelivery(dispatchingThread.EventHandle);
}
- if(!startedAsyncDelivery)
+ if(!dispatchingThread.IsStarted)
{
this.dispatchingThread.ExceptionListener +=
this.dispatchingThread_ExceptionHandler;
dispatchingThread.Start();
- startedAsyncDelivery = true;
}
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/MutexTransport.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/MutexTransport.cs?rev=693516&r1=693515&r2=693516&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/MutexTransport.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/MutexTransport.cs
Tue Sep 9 10:08:09 2008
@@ -20,32 +20,32 @@
namespace Apache.NMS.ActiveMQ.Transport
{
- /// <summary>
- /// A Transport which guards access to the next transport using a mutex.
- /// </summary>
- public class MutexTransport : TransportFilter
- {
- private readonly object transmissionLock = new object();
+ /// <summary>
+ /// A Transport which guards access to the next transport using a mutex.
+ /// </summary>
+ public class MutexTransport : TransportFilter
+ {
+ private readonly object transmissionLock = new object();
- public MutexTransport(ITransport next) : base(next)
+ public MutexTransport(ITransport next) : base(next)
{
- }
-
- public override void Oneway(Command command)
- {
- lock(transmissionLock)
- {
- this.next.Oneway(command);
- }
- }
+ }
- public override FutureResponse AsyncRequest(Command command)
- {
- lock(transmissionLock)
- {
- return base.AsyncRequest(command);
- }
- }
+ public override void Oneway(Command command)
+ {
+ lock(transmissionLock)
+ {
+ this.next.Oneway(command);
+ }
+ }
+
+ public override FutureResponse AsyncRequest(Command command)
+ {
+ lock(transmissionLock)
+ {
+ return base.AsyncRequest(command);
+ }
+ }
public override Response Request(Command command, TimeSpan
timeout)
{
@@ -54,13 +54,5 @@
return base.Request(command, timeout);
}
}
-
- public override void Dispose()
- {
- lock(transmissionLock)
- {
- base.Dispose();
- }
- }
- }
+ }
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=693516&r1=693515&r2=693516&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
Tue Sep 9 10:08:09 2008
@@ -17,6 +17,7 @@
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS.ActiveMQ.OpenWire;
using Apache.NMS.ActiveMQ.Transport;
+using Apache.NMS.Util;
using System;
using System.IO;
using System.Net.Sockets;
@@ -24,23 +25,23 @@
namespace Apache.NMS.ActiveMQ.Transport.Tcp
{
-
+
/// <summary>
/// An implementation of ITransport that uses sockets to communicate
with the broker
/// </summary>
public class TcpTransport : ITransport
{
- private readonly object initLock = new object();
+ private readonly object myLock = new object();
private readonly Socket socket;
private IWireFormat wireformat;
private BinaryReader socketReader;
private BinaryWriter socketWriter;
- private readonly object socketWriterLock = new object();
private Thread readThread;
private bool started;
- private Util.AtomicBoolean closed = new
Util.AtomicBoolean(false);
+ private volatile bool closed;
+ private volatile bool seenShutdown;
private TimeSpan maxWait =
TimeSpan.FromMilliseconds(Timeout.Infinite);
-
+
private CommandHandler commandHandler;
private ExceptionHandler exceptionHandler;
private TimeSpan MAX_THREAD_WAIT =
TimeSpan.FromMilliseconds(30000);
@@ -51,13 +52,18 @@
this.socket = socket;
this.wireformat = wireformat;
}
-
+
+ ~TcpTransport()
+ {
+ Dispose(false);
+ }
+
/// <summary>
/// Method Start
/// </summary>
public void Start()
{
- lock (initLock)
+ lock(myLock)
{
if (!started)
{
@@ -95,22 +101,27 @@
{
get
{
- lock(initLock)
+ lock(myLock)
{
return started;
}
}
}
-
+
public void Oneway(Command command)
{
- lock (socketWriterLock)
+ lock(myLock)
{
try
{
- if(closed.Value)
+ if(closed)
+ {
+ throw new
InvalidOperationException("Error writing to broker. Transport connection is
closed.");
+ }
+
+ if(command is ShutdownInfo)
{
- throw new Exception("Error
writing to broker. Transport connection is closed.");
+ seenShutdown = true;
}
Wireformat.Marshal(command,
socketWriter);
@@ -134,7 +145,7 @@
}
}
}
-
+
public FutureResponse AsyncRequest(Command command)
{
throw new NotImplementedException("Use a
ResponseCorrelator if you want to issue AsyncRequest calls");
@@ -169,13 +180,20 @@
{
throw new NotImplementedException("Use a
ResponseCorrelator if you want to issue Request calls");
}
-
+
public void Close()
{
- if(closed.CompareAndSet(false, true))
+ if(!closed)
{
- lock(initLock)
+ lock(myLock)
{
+ if(closed)
+ {
+ return;
+ }
+
+ closed = true;
+
try
{
socket.Shutdown(SocketShutdown.Both);
@@ -186,12 +204,9 @@
try
{
- lock(socketWriterLock)
+ if(null != socketWriter)
{
- if(null != socketWriter)
- {
-
socketWriter.Close();
- }
+ socketWriter.Close();
}
}
catch
@@ -260,9 +275,15 @@
public void Dispose()
{
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected void Dispose(bool disposing)
+ {
Close();
}
-
+
public void ReadLoop()
{
// This is the thread function for the reader thread.
This runs continuously
@@ -279,7 +300,7 @@
// An exception in the command handler may not be fatal
to the transport, so
// these are simply reported to the exceptionHandler.
//
- while(!closed.Value)
+ while(!closed)
{
Command command = null;
@@ -290,7 +311,7 @@
catch(Exception ex)
{
command = null;
- if(!closed.Value)
+ if(!closed && !seenShutdown)
{
// Close the socket as there's
little that can be done with this transport now.
Close();
@@ -313,9 +334,9 @@
}
}
}
-
+
// Implementation methods
-
+
public CommandHandler Command
{
get { return commandHandler; }
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs?rev=693516&r1=693515&r2=693516&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
Tue Sep 9 10:08:09 2008
@@ -24,28 +24,33 @@
/// Used to implement a filter on the transport layer.
/// </summary>
public class TransportFilter : ITransport
- {
+ {
protected readonly ITransport next;
protected CommandHandler commandHandler;
protected ExceptionHandler exceptionHandler;
-
+
public TransportFilter(ITransport next)
{
this.next = next;
this.next.Command = new CommandHandler(OnCommand);
this.next.Exception = new ExceptionHandler(OnException);
}
-
+
+ ~TransportFilter()
+ {
+ Dispose(false);
+ }
+
protected virtual void OnCommand(ITransport sender, Command
command)
{
this.commandHandler(sender, command);
}
-
+
protected virtual void OnException(ITransport sender, Exception
command)
{
this.exceptionHandler(sender, command);
}
-
+
/// <summary>
/// Method Oneway
/// </summary>
@@ -54,7 +59,7 @@
{
this.next.Oneway(command);
}
-
+
/// <summary>
/// Method AsyncRequest
/// </summary>
@@ -124,23 +129,32 @@
/// <summary>
/// Method Dispose
/// </summary>
- public virtual void Dispose()
+ public void Dispose()
{
- this.next.Dispose();
+ Dispose(true);
+ GC.SuppressFinalize(this);
}
-
+
+ protected virtual void Dispose(bool disposing)
+ {
+ if(disposing)
+ {
+ this.next.Dispose();
+ }
+ }
+
public CommandHandler Command
{
- get { return commandHandler; }
- set { this.commandHandler = value; }
- }
-
- public ExceptionHandler Exception
- {
- get { return exceptionHandler; }
- set { this.exceptionHandler = value; }
- }
-
- }
+ get { return commandHandler; }
+ set { this.commandHandler = value; }
+ }
+
+ public ExceptionHandler Exception
+ {
+ get { return exceptionHandler; }
+ set { this.exceptionHandler = value; }
+ }
+
+ }
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs?rev=693516&r1=693515&r2=693516&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs
Tue Sep 9 10:08:09 2008
@@ -24,85 +24,84 @@
namespace Apache.NMS.ActiveMQ.Transport
{
-
- /// <summary>
- /// A Transport which negotiates the wire format
- /// </summary>
- public class WireFormatNegotiator : TransportFilter
- {
- private OpenWireFormat wireFormat;
- private TimeSpan negotiateTimeout = TimeSpan.FromSeconds(15);
-
- private AtomicBoolean firstStart=new AtomicBoolean(true);
- private CountDownLatch readyCountDownLatch = new CountDownLatch(1);
- private CountDownLatch wireInfoSentDownLatch = new CountDownLatch(1);
-
- public WireFormatNegotiator(ITransport next, OpenWireFormat wireFormat)
- : base(next)
- {
- this.wireFormat = wireFormat;
- }
-
- public override void Start()
- {
- base.Start();
- if (firstStart.CompareAndSet(true, false))
- {
- try
- {
- next.Oneway(wireFormat.PreferedWireFormatInfo);
- }
- finally
- {
- wireInfoSentDownLatch.countDown();
- }
- }
- }
-
- public override void Dispose()
- {
- base.Dispose();
- readyCountDownLatch.countDown();
- }
-
- public override void Oneway(Command command)
- {
- if (!readyCountDownLatch.await(negotiateTimeout))
- throw new IOException("Wire format negotiation timeout: peer
did not send his wire format.");
- next.Oneway(command);
- }
-
- protected override void OnCommand(ITransport sender, Command command)
- {
- if ( command.GetDataStructureType() ==
WireFormatInfo.ID_WireFormatInfo )
- {
- WireFormatInfo info = (WireFormatInfo)command;
- try
- {
- if (!info.Valid)
- {
- throw new IOException("Remote wire format magic is
invalid");
- }
- wireInfoSentDownLatch.await(negotiateTimeout);
- wireFormat.renegotiateWireFormat(info);
- }
- catch (Exception e)
- {
- OnException(this, e);
- }
- finally
- {
- readyCountDownLatch.countDown();
- }
- }
- this.commandHandler(sender, command);
- }
-
- protected override void OnException(ITransport sender, Exception
command)
- {
- readyCountDownLatch.countDown();
- this.exceptionHandler(sender, command);
- }
- }
+ /// <summary>
+ /// A Transport which negotiates the wire format
+ /// </summary>
+ public class WireFormatNegotiator : TransportFilter
+ {
+ private OpenWireFormat wireFormat;
+ private TimeSpan negotiateTimeout = TimeSpan.FromSeconds(15);
+
+ private AtomicBoolean firstStart=new AtomicBoolean(true);
+ private CountDownLatch readyCountDownLatch = new
CountDownLatch(1);
+ private CountDownLatch wireInfoSentDownLatch = new
CountDownLatch(1);
+
+ public WireFormatNegotiator(ITransport next, OpenWireFormat
wireFormat)
+ : base(next)
+ {
+ this.wireFormat = wireFormat;
+ }
+
+ public override void Start()
+ {
+ base.Start();
+ if (firstStart.CompareAndSet(true, false))
+ {
+ try
+ {
+
next.Oneway(wireFormat.PreferedWireFormatInfo);
+ }
+ finally
+ {
+ wireInfoSentDownLatch.countDown();
+ }
+ }
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ base.Dispose(disposing);
+ readyCountDownLatch.countDown();
+ }
+
+ public override void Oneway(Command command)
+ {
+ if (!readyCountDownLatch.await(negotiateTimeout))
+ throw new IOException("Wire format negotiation
timeout: peer did not send his wire format.");
+ next.Oneway(command);
+ }
+
+ protected override void OnCommand(ITransport sender, Command
command)
+ {
+ if ( command.GetDataStructureType() ==
WireFormatInfo.ID_WireFormatInfo )
+ {
+ WireFormatInfo info = (WireFormatInfo)command;
+ try
+ {
+ if (!info.Valid)
+ {
+ throw new IOException("Remote
wire format magic is invalid");
+ }
+
wireInfoSentDownLatch.await(negotiateTimeout);
+ wireFormat.renegotiateWireFormat(info);
+ }
+ catch (Exception e)
+ {
+ OnException(this, e);
+ }
+ finally
+ {
+ readyCountDownLatch.countDown();
+ }
+ }
+ this.commandHandler(sender, command);
+ }
+
+ protected override void OnException(ITransport sender,
Exception command)
+ {
+ readyCountDownLatch.countDown();
+ this.exceptionHandler(sender, command);
+ }
+ }
}