Author: jgomes
Date: Sat Sep 10 23:09:20 2016
New Revision: 1760211
URL: http://svn.apache.org/viewvc?rev=1760211&view=rev
Log:
Apply patch for AMQNET-556 from Stephane Ramet. Thanks Stephane!
Added:
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/msmqprovider-test.config
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/AsyncConsumeTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/BadConsumeTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/BytesMessageTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/BytesMessage.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/Destination.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/MapMessage.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/Message.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/ObjectMessage.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/Queue.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/StreamMessage.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempDestination.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempQueue.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempTopic.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TextMessage.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/Topic.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/ConnectionTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/ConsumerTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/DurableTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/EndianBinaryReaderTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/EndianBinaryWriterTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/EndianTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/ForeignMessageTransformationTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQAsyncConsumeTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQBadConsumeTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQBytesMessageTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQConnectionTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQConsumerTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQDurableTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQForeignMessageTransformationTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQMapMessageTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQMessageSelectorTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQMessageTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQMessageTransformerTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQNMSPropertyTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQProducerTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQRequestResponseTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQStreamMessageTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQTempDestinationDeletionTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQTempDestinationTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQTestSupport.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQTextMessageTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQTransactionTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQXmlMessageTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MapMessageTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MessageSelectorTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MessageTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MessageTransformerTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/NMSPropertyTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/NMSTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/NMSTestSupport.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/NMSTracer.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/PrimitiveMapTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/ProducerTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/RedeliveryPolicyTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/RequestResponseTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/StreamMessageTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/TempDestinationDeletionTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/TempDestinationTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/TextMessageTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/TransactionTest.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/XmlMessageTest.cs
Modified:
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/Apache.NMS.MSMQ.Test.nunit
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/nant.build
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/ConnectionMetaData.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/DefaultMessageConverter.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/IMessageConverterEx.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageProducer.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Queue.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/QueueBrowser.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/AbstractMessageReader.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/ByCorrelationIdMessageReader.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/ByIdMessageReader.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/ByLookupIdMessageReader.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/BySelectorMessageReader.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/IMessageReader.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/MessageReaderUtil.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/NonFilteringMessageReader.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/ANDExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/AlignedNumericValues.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/ArithmeticExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/BinaryExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/BooleanCastExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/BooleanConstantExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/BooleanUnaryExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/ComparisonExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/ConstantExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/DivideExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/EqualExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/GreaterExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/GreaterOrEqualExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/IBooleanExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/IExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/InExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/IsNullExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/LesserExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/LesserOrEqualExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/LikeExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/LogicExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/MessageEvaluationContext.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/MinusExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/ModExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/MultiplyExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/NOTExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/NegateExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/ORExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/ParseException.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/PlusExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/PropertyExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/SelectorParser.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/SelectorParser.csc
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/SelectorParserConstants.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/SelectorParserTokenManager.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/SimpleCharStream.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/Token.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/TokenMgrError.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/UnaryExpression.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Session.cs
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/vs2008-msmq-test.csproj
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/vs2008-msmq.csproj
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/vs2008-msmq.sln
Modified:
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/Apache.NMS.MSMQ.Test.nunit
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/Apache.NMS.MSMQ.Test.nunit?rev=1760211&r1=1760210&r2=1760211&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/Apache.NMS.MSMQ.Test.nunit
(original)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/Apache.NMS.MSMQ.Test.nunit
Sat Sep 10 23:09:20 2016
@@ -1,7 +1,7 @@
-<NUnitProject>
- <Settings activeconfig="Default" />
- <Config name="Default" binpathtype="Auto">
- <assembly path="Apache.NMS.Test.dll" />
- <assembly path="Apache.NMS.MSMQ.Test.dll" />
- </Config>
+<NUnitProject>
+ <Settings activeconfig="Default" />
+ <Config name="Default" binpathtype="Auto">
+ <!--<assembly path="Apache.NMS.Test.dll" />-->
+ <assembly path="Apache.NMS.MSMQ.Test.dll" />
+ </Config>
</NUnitProject>
\ No newline at end of file
Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/msmqprovider-test.config
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/msmqprovider-test.config?rev=1760211&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/msmqprovider-test.config
(added)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/msmqprovider-test.config Sat
Sep 10 23:09:20 2016
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="utf-8" ?>
+<!--
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+-->
+<configuration>
+ <testURI value="msmq://localhost">
+ <userName value="guest"/>
+ <passWord value="guest"/>
+ <defaultTestQueue value="queue://.\Private$\TestQ"/>
+ <defaultTestTopic value=""/>
+ <defaultTestQueue2 value="queue://.\Private$\TestQ2"/>
+ <defaultTestTopic2 value=""/>
+ <durableConsumerTestTopic value=""/>
+ <messageSelectorTestQueue value="queue://.\Private$\TestQ"/>
+ <messageSelectorTestTopic value=""/>
+ <deletionTestQueue value="queue://.\Private$\TestQ"/>
+ <deletionTestTopic value=""/>
+ <deletionTestTempQueue value=""/>
+ <deletionTestTempTopic value=""/>
+ <transactionTestQueue value="queue://.\Private$\TestQT"/>
+ </testURI>
+</configuration>
Modified: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/nant.build
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/nant.build?rev=1760211&r1=1760210&r2=1760211&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/nant.build (original)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/nant.build Sat Sep 10
23:09:20 2016
@@ -1,4 +1,4 @@
-<?xml version="1.0"?>
+<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
@@ -31,8 +31,6 @@
<property name="nunit.dll"
value="${basedir}/lib/NUnit/${current.build.framework}/nunit.framework.dll"
dynamic="true" />
<property name="Apache.NMS.dll"
value="${basedir}/lib/Apache.NMS/${current.build.framework}/Apache.NMS.dll"
dynamic="true" />
<property name="Apache.NMS.pdb"
value="${basedir}/lib/Apache.NMS/${current.build.framework}/Apache.NMS.pdb"
dynamic="true" />
- <property name="Apache.NMS.Test.dll"
value="${basedir}/lib/Apache.NMS/${current.build.framework}//Apache.NMS.Test.dll"
dynamic="true" />
- <property name="Apache.NMS.Test.pdb"
value="${basedir}/lib/Apache.NMS/${current.build.framework}/Apache.NMS.Test.pdb"
dynamic="true" />
<property name="NUnit.Projectfile" value="Apache.NMS.MSMQ.Test.nunit" />
<!-- Skip certain frameworks, since MSMQ is not supported on those
platforms. -->
@@ -77,7 +75,6 @@
<include
name="${current.build.framework.assembly.dir}/System.Xml.dll" />
<include
name="${current.build.framework.assembly.dir}/System.Messaging.dll" />
<include name="${Apache.NMS.dll}" />
- <include name="${Apache.NMS.Test.dll}" />
<include name="${build.bin.dir}/${project.name}.dll" />
<include name="${nunit.dll}" />
</assemblyfileset>
@@ -88,8 +85,6 @@
<include name="nmsprovider-*.config" />
<include name="${Apache.NMS.dll}" />
<include name="${Apache.NMS.pdb}" />
- <include name="${Apache.NMS.Test.dll}" />
- <include name="${Apache.NMS.Test.pdb}" />
<include name="${nunit.dll}" />
<include name="${NUnit.Projectfile}" />
</fileset>
Modified:
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Connection.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Connection.cs?rev=1760211&r1=1760210&r2=1760211&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Connection.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Connection.cs
Sat Sep 10 23:09:20 2016
@@ -16,6 +16,7 @@
*/
using System;
+using System.Threading;
namespace Apache.NMS.MSMQ
{
@@ -26,21 +27,101 @@ namespace Apache.NMS.MSMQ
///
public class Connection : IConnection
{
- private AcknowledgementMode acknowledgementMode =
AcknowledgementMode.AutoAcknowledge;
- private IMessageConverter messageConverter = new
DefaultMessageConverter();
+ #region Constructors
- private IRedeliveryPolicy redeliveryPolicy;
- private ConnectionMetaData metaData = null;
- private bool connected;
- private bool closed;
- private string clientId;
+ public Connection()
+ {
+ // now lets send the connection and see if we get an ack/nak
+ // TODO: establish a connection
+ }
+
+ #endregion
+
+ #region Connection state
+
+ public enum ConnectionState
+ {
+ Created,
+ Connected,
+ Starting,
+ Started,
+ Stopping,
+ Stopped,
+ Closed
+ }
+
+ private ConnectionState state = ConnectionState.Created;
+
+ public class StateChangeEventArgs : EventArgs
+ {
+ public StateChangeEventArgs(ConnectionState originalState,
+ ConnectionState currentState)
+ {
+ this.originalState = originalState;
+ this.currentState = currentState;
+ }
+
+ private ConnectionState originalState;
+ public ConnectionState OriginalState
+ {
+ get { return originalState; }
+ }
+
+ private ConnectionState currentState;
+ public ConnectionState CurrentState
+ {
+ get { return currentState; }
+ }
+ }
+
+ public delegate void StateChangeEventHandler(object sender,
StateChangeEventArgs e);
+
+ public event StateChangeEventHandler ConnectionStateChange;
+
+ private void ChangeState(ConnectionState newState)
+ {
+ if(ConnectionStateChange != null)
+ {
+ ConnectionStateChange(this,
+ new StateChangeEventArgs(this.state, newState));
+ }
+
+ this.state = newState;
+ }
+
+ private object stateLock = new object();
+
+ #endregion
+
+ #region Start & stop
/// <summary>
/// Starts message delivery for this connection.
/// </summary>
public void Start()
{
- CheckConnected();
+ lock(stateLock)
+ {
+ switch(state)
+ {
+ case ConnectionState.Created:
+ case ConnectionState.Connected:
+ case ConnectionState.Stopped:
+ ChangeState(ConnectionState.Starting);
+ ChangeState(ConnectionState.Started);
+ break;
+
+ case ConnectionState.Stopping:
+ throw new NMSException("Connection stopping");
+
+ case ConnectionState.Closed:
+ throw new NMSException("Connection closed");
+
+ case ConnectionState.Starting:
+ case ConnectionState.Started:
+ break;
+ }
+ }
}
/// <summary>
@@ -49,7 +130,7 @@ namespace Apache.NMS.MSMQ
/// </summary>
public bool IsStarted
{
- get { return true; }
+ get { return state == ConnectionState.Started; }
}
/// <summary>
@@ -57,9 +138,65 @@ namespace Apache.NMS.MSMQ
/// </summary>
public void Stop()
{
- CheckConnected();
+ lock(stateLock)
+ {
+ switch(state)
+ {
+ case ConnectionState.Started:
+ ChangeState(ConnectionState.Stopping);
+ ChangeState(ConnectionState.Stopped);
+ break;
+
+ case ConnectionState.Starting:
+ throw new NMSException("Connection starting");
+
+ case ConnectionState.Closed:
+ throw new NMSException("Connection closed");
+
+ case ConnectionState.Created:
+ case ConnectionState.Connected:
+ case ConnectionState.Stopping:
+ case ConnectionState.Stopped:
+ break;
+ }
+ }
+ }
+
+ #endregion
+
+ #region Close & dispose
+
+ public void Close()
+ {
+ if(!IsClosed)
+ {
+ Stop();
+
+ state = ConnectionState.Closed;
+ }
+ }
+
+ public bool IsClosed
+ {
+ get { return state == ConnectionState.Closed; }
}
+ public void Dispose()
+ {
+ try
+ {
+ Close();
+ }
+ catch
+ {
+ state = ConnectionState.Closed;
+ }
+ }
+
+ #endregion
+
+ #region Create session
+
/// <summary>
/// Creates a new session to work on this connection
/// </summary>
@@ -73,14 +210,16 @@ namespace Apache.NMS.MSMQ
/// </summary>
public ISession CreateSession(AcknowledgementMode mode)
{
- CheckConnected();
+ if(IsClosed)
+ {
+ throw new NMSException("Connection closed");
+ }
return new Session(this, mode);
}
- public void Dispose()
- {
- closed = true;
- }
+ #endregion
+
+ #region Connection properties
/// <summary>
/// The default timeout for network requests.
@@ -91,24 +230,27 @@ namespace Apache.NMS.MSMQ
set { }
}
+ private AcknowledgementMode acknowledgementMode =
AcknowledgementMode.AutoAcknowledge;
public AcknowledgementMode AcknowledgementMode
{
get { return acknowledgementMode; }
set { acknowledgementMode = value; }
}
+ private IMessageConverter messageConverter = new
DefaultMessageConverter();
public IMessageConverter MessageConverter
{
get { return messageConverter; }
set { messageConverter = value; }
}
+ private string clientId;
public string ClientId
{
get { return clientId; }
set
{
- if(connected)
+ if(state != ConnectionState.Created)
{
throw new NMSException("You cannot change the ClientId
once the Connection is connected");
}
@@ -116,6 +258,7 @@ namespace Apache.NMS.MSMQ
}
}
+ private IRedeliveryPolicy redeliveryPolicy;
/// <summary>
/// Get/or set the redelivery policy for this connection.
/// </summary>
@@ -125,6 +268,19 @@ namespace Apache.NMS.MSMQ
set { this.redeliveryPolicy = value; }
}
+ private ConnectionMetaData metaData = null;
+ /// <summary>
+ /// Gets the Meta Data for the NMS Connection instance.
+ /// </summary>
+ public IConnectionMetaData MetaData
+ {
+ get { return this.metaData ?? (this.metaData = new
ConnectionMetaData()); }
+ }
+
+ #endregion
+
+ #region Transformer delegates
+
private ConsumerTransformerDelegate consumerTransformer;
public ConsumerTransformerDelegate ConsumerTransformer
{
@@ -139,57 +295,18 @@ namespace Apache.NMS.MSMQ
set { this.producerTransformer = value; }
}
- /// <summary>
- /// Gets the Meta Data for the NMS Connection instance.
- /// </summary>
- public IConnectionMetaData MetaData
- {
- get { return this.metaData ?? (this.metaData = new
ConnectionMetaData()); }
- }
+ #endregion
+
+ #region Exception & transport listeners
/// <summary>
/// A delegate that can receive transport level exceptions.
/// </summary>
public event ExceptionListener ExceptionListener;
- /// <summary>
- /// An asynchronous listener that is notified when a Fault tolerant
connection
- /// has been interrupted.
- /// </summary>
- public event ConnectionInterruptedListener
ConnectionInterruptedListener;
-
- /// <summary>
- /// An asynchronous listener that is notified when a Fault tolerant
connection
- /// has been resumed.
- /// </summary>
- public event ConnectionResumedListener ConnectionResumedListener;
-
- protected void CheckConnected()
- {
- if(closed)
- {
- throw new NMSException("Connection Closed");
- }
- if(!connected)
- {
- connected = true;
- // now lets send the connection and see if we get an ack/nak
- // TODO: establish a connection
- }
- }
-
- public void Close()
- {
- Dispose();
- }
-
- public void PurgeTempDestinations()
- {
- }
-
public void HandleException(Exception e)
{
- if(ExceptionListener != null && !this.closed)
+ if(ExceptionListener != null && !this.IsClosed)
{
ExceptionListener(e);
}
@@ -199,11 +316,17 @@ namespace Apache.NMS.MSMQ
}
}
+ /// <summary>
+ /// An asynchronous listener that is notified when a Fault tolerant
connection
+ /// has been interrupted.
+ /// </summary>
+ public event ConnectionInterruptedListener
ConnectionInterruptedListener;
+
public void HandleTransportInterrupted()
{
Tracer.Debug("Transport has been Interrupted.");
- if(this.ConnectionInterruptedListener != null && !this.closed)
+ if(this.ConnectionInterruptedListener != null && !this.IsClosed)
{
try
{
@@ -215,11 +338,17 @@ namespace Apache.NMS.MSMQ
}
}
+ /// <summary>
+ /// An asynchronous listener that is notified when a Fault tolerant
connection
+ /// has been resumed.
+ /// </summary>
+ public event ConnectionResumedListener ConnectionResumedListener;
+
public void HandleTransportResumed()
{
Tracer.Debug("Transport has resumed normal operation.");
- if(this.ConnectionResumedListener != null && !this.closed)
+ if(this.ConnectionResumedListener != null && !this.IsClosed)
{
try
{
@@ -230,5 +359,11 @@ namespace Apache.NMS.MSMQ
}
}
}
+
+ #endregion
+
+ public void PurgeTempDestinations()
+ {
+ }
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/ConnectionMetaData.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/ConnectionMetaData.cs?rev=1760211&r1=1760210&r2=1760211&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/ConnectionMetaData.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/ConnectionMetaData.cs
Sat Sep 10 23:09:20 2016
@@ -1,107 +1,107 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-using System.Reflection;
-
-namespace Apache.NMS.MSMQ
-{
- /// <summary>
- /// Implements the Connection Meta-Data feature for Apache.NMS.MSMQ
- /// </summary>
- public class ConnectionMetaData : IConnectionMetaData
- {
- private int nmsMajorVersion;
- private int nmsMinorVersion;
-
- private string nmsProviderName;
- private string nmsVersion;
-
- private int providerMajorVersion;
- private int providerMinorVersion;
- private string providerVersion;
-
- private string[] nmsxProperties;
-
- public ConnectionMetaData()
- {
- Assembly self = Assembly.GetExecutingAssembly();
- AssemblyName asmName = self.GetName();
-
- this.nmsProviderName = asmName.Name;
- this.providerMajorVersion = asmName.Version.Major;
- this.providerMinorVersion = asmName.Version.Minor;
- this.providerVersion = asmName.Version.ToString();
-
- this.nmsxProperties = new String[] { };
-
- foreach(AssemblyName name in
self.GetReferencedAssemblies())
- {
- if(0 == string.Compare(name.Name, "Apache.NMS",
true))
- {
- this.nmsMajorVersion =
name.Version.Major;
- this.nmsMinorVersion =
name.Version.Minor;
- this.nmsVersion =
name.Version.ToString();
-
- return;
- }
- }
-
- throw new NMSException("Could not find a reference to
the Apache.NMS Assembly.");
- }
-
- public int NMSMajorVersion
- {
- get { return this.nmsMajorVersion; }
- }
-
- public int NMSMinorVersion
- {
- get { return this.nmsMinorVersion; }
- }
-
- public string NMSProviderName
- {
- get { return this.nmsProviderName; }
- }
-
- public string NMSVersion
- {
- get { return this.nmsVersion; }
- }
-
- public string[] NMSXPropertyNames
- {
- get { return this.nmsxProperties; }
- }
-
- public int ProviderMajorVersion
- {
- get { return this.providerMajorVersion; }
- }
-
- public int ProviderMinorVersion
- {
- get { return this.providerMinorVersion; }
- }
-
- public string ProviderVersion
- {
- get { return this.providerVersion; }
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Reflection;
+
+namespace Apache.NMS.MSMQ
+{
+ /// <summary>
+ /// Implements the Connection Meta-Data feature for Apache.NMS.MSMQ
+ /// </summary>
+ public class ConnectionMetaData : IConnectionMetaData
+ {
+ private int nmsMajorVersion;
+ private int nmsMinorVersion;
+
+ private string nmsProviderName;
+ private string nmsVersion;
+
+ private int providerMajorVersion;
+ private int providerMinorVersion;
+ private string providerVersion;
+
+ private string[] nmsxProperties;
+
+ public ConnectionMetaData()
+ {
+ Assembly self = Assembly.GetExecutingAssembly();
+ AssemblyName asmName = self.GetName();
+
+ this.nmsProviderName = asmName.Name;
+ this.providerMajorVersion = asmName.Version.Major;
+ this.providerMinorVersion = asmName.Version.Minor;
+ this.providerVersion = asmName.Version.ToString();
+
+ this.nmsxProperties = new String[] { };
+
+ foreach(AssemblyName name in
self.GetReferencedAssemblies())
+ {
+ if(0 == string.Compare(name.Name, "Apache.NMS",
true))
+ {
+ this.nmsMajorVersion =
name.Version.Major;
+ this.nmsMinorVersion =
name.Version.Minor;
+ this.nmsVersion =
name.Version.ToString();
+
+ return;
+ }
+ }
+
+ throw new NMSException("Could not find a reference to
the Apache.NMS Assembly.");
+ }
+
+ public int NMSMajorVersion
+ {
+ get { return this.nmsMajorVersion; }
+ }
+
+ public int NMSMinorVersion
+ {
+ get { return this.nmsMinorVersion; }
+ }
+
+ public string NMSProviderName
+ {
+ get { return this.nmsProviderName; }
+ }
+
+ public string NMSVersion
+ {
+ get { return this.nmsVersion; }
+ }
+
+ public string[] NMSXPropertyNames
+ {
+ get { return this.nmsxProperties; }
+ }
+
+ public int ProviderMajorVersion
+ {
+ get { return this.providerMajorVersion; }
+ }
+
+ public int ProviderMinorVersion
+ {
+ get { return this.providerMinorVersion; }
+ }
+
+ public string ProviderVersion
+ {
+ get { return this.providerVersion; }
+ }
+ }
+}
Modified:
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/DefaultMessageConverter.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/DefaultMessageConverter.cs?rev=1760211&r1=1760210&r2=1760211&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/DefaultMessageConverter.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/DefaultMessageConverter.cs
Sat Sep 10 23:09:20 2016
@@ -118,16 +118,16 @@ namespace Apache.NMS.MSMQ
// Populate property data
foreach(object keyObject in message.Properties.Keys)
{
- string key = (keyObject as string);
- object val = message.Properties.GetString(key);
- if(!SetLabelAsNMSType && string.Compare(key, "Label", true) == 0
&& val != null)
- {
- msmqMessage.Label = val.ToString();
- }
- else
- {
- propertyData[key] = val;
- }
+ string key = (keyObject as string);
+ object val = message.Properties[key];
+ if(!SetLabelAsNMSType && string.Compare(key, "Label", true) ==
0 && val != null)
+ {
+ msmqMessage.Label = val.ToString();
+ }
+ else
+ {
+ propertyData[key] = val;
+ }
}
// Store the NMS property data in the extension area
@@ -418,6 +418,8 @@ namespace Apache.NMS.MSMQ
result = baseMessage;
}
+ result.ReadOnlyBody = true;
+
return result;
}
@@ -442,6 +444,7 @@ namespace Apache.NMS.MSMQ
else if(message is BytesMessage)
{
BytesMessage bytesMessage = message as
BytesMessage;
+ bytesMessage.Reset();
answer.BodyStream.Write(bytesMessage.Content,
0, bytesMessage.Content.Length);
answer.AppSpecific = (int)
NMSMessageType.BytesMessage;
}
@@ -604,12 +607,14 @@ namespace Apache.NMS.MSMQ
/// <result>MSMQ queue.</result>
public MessageQueue ToMsmqDestination(IDestination destination)
{
- if(null == destination)
+ Queue queue = destination as Queue;
+
+ if(destination == null)
{
return null;
}
- return new MessageQueue((destination as
Destination).Path);
+ return queue.MSMQMessageQueue;
}
/// <summary>
@@ -625,7 +630,7 @@ namespace Apache.NMS.MSMQ
return null;
}
- return new Queue(destinationQueue.Path);
+ return new Queue(destinationQueue);
}
#endregion
Modified:
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/IMessageConverterEx.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/IMessageConverterEx.cs?rev=1760211&r1=1760210&r2=1760211&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/IMessageConverterEx.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/IMessageConverterEx.cs
Sat Sep 10 23:09:20 2016
@@ -1,44 +1,44 @@
-using System.Messaging;
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.NMS.MSMQ
-{
- /// <summary>
- /// Extended IMessageConverter interface supporting new methods for
- /// optimizing message selection through "selectors".
- /// The original IMessageConverter is maintained for compatibility
- /// reasons with existing clients implementing it.
- /// </summary>
- public interface IMessageConverterEx : IMessageConverter
- {
- /// <summary>
- /// Converts the specified MSMQ message to an equivalent NMS message.
- /// </summary>
- /// <param name="message">MSMQ message to be converted.</param>
- /// <param name="convertBody">true if message body should be
converted.</param>
- /// <result>Converted NMS message.</result>
- IMessage ToNmsMessage(Message message, bool convertBody);
-
- /// <summary>
- /// Converts an MSMQ message body to the equivalent NMS message body.
- /// </summary>
- /// <param name="message">Source MSMQ message.</param>
- /// <param name="answer">Target NMS message.</param>
- void ConvertMessageBodyToNMS(Message message, IMessage answer);
- }
-}
+using System.Messaging;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MSMQ
+{
+ /// <summary>
+ /// Extended IMessageConverter interface supporting new methods for
+ /// optimizing message selection through "selectors".
+ /// The original IMessageConverter is maintained for compatibility
+ /// reasons with existing clients implementing it.
+ /// </summary>
+ public interface IMessageConverterEx : IMessageConverter
+ {
+ /// <summary>
+ /// Converts the specified MSMQ message to an equivalent NMS message.
+ /// </summary>
+ /// <param name="message">MSMQ message to be converted.</param>
+ /// <param name="convertBody">true if message body should be
converted.</param>
+ /// <result>Converted NMS message.</result>
+ IMessage ToNmsMessage(Message message, bool convertBody);
+
+ /// <summary>
+ /// Converts an MSMQ message body to the equivalent NMS message body.
+ /// </summary>
+ /// <param name="message">Source MSMQ message.</param>
+ /// <param name="answer">Target NMS message.</param>
+ void ConvertMessageBodyToNMS(Message message, IMessage answer);
+ }
+}
Modified:
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageConsumer.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1760211&r1=1760210&r2=1760211&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageConsumer.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageConsumer.cs
Sat Sep 10 23:09:20 2016
@@ -32,9 +32,6 @@ namespace Apache.NMS.MSMQ
private readonly Session session;
private readonly AcknowledgementMode acknowledgementMode;
private MessageQueue messageQueue;
- private Thread asyncDeliveryThread = null;
- private AutoResetEvent pause = new AutoResetEvent(false);
- private Atomic<bool> asyncDelivery = new Atomic<bool>(false);
private ConsumerTransformerDelegate consumerTransformer;
public ConsumerTransformerDelegate ConsumerTransformer
@@ -81,6 +78,8 @@ namespace Apache.NMS.MSMQ
messageQueue, session.MessageConverter, selector);
}
+ #region Asynchronous delivery
+
private int listenerCount = 0;
private event MessageListener listener;
public event MessageListener Listener
@@ -89,7 +88,13 @@ namespace Apache.NMS.MSMQ
{
listener += value;
listenerCount++;
- StartAsyncDelivery();
+
+ session.Connection.ConnectionStateChange +=
OnConnectionStateChange;
+
+ if(session.Connection.IsStarted)
+ {
+ StartAsyncDelivery();
+ }
}
remove
@@ -100,87 +105,37 @@ namespace Apache.NMS.MSMQ
listenerCount--;
}
- if(0 == listenerCount)
+ if(listenerCount == 0)
{
+ session.Connection.ConnectionStateChange -=
OnConnectionStateChange;
+
StopAsyncDelivery();
}
}
}
- public IMessage Receive()
- {
- IMessage nmsMessage = null;
-
- if(messageQueue != null)
- {
- nmsMessage = reader.Receive();
- nmsMessage = TransformMessage(nmsMessage);
- }
-
- return nmsMessage;
- }
-
- public IMessage Receive(TimeSpan timeout)
- {
- IMessage nmsMessage = null;
-
- if(messageQueue != null)
- {
- nmsMessage = reader.Receive(timeout);
- nmsMessage = TransformMessage(nmsMessage);
- }
-
- return nmsMessage;
- }
-
- public IMessage ReceiveNoWait()
- {
- IMessage nmsMessage = null;
-
- if(messageQueue != null)
- {
- nmsMessage = reader.Receive(zeroTimeout);
- nmsMessage = TransformMessage(nmsMessage);
- }
-
- return nmsMessage;
- }
-
- public void Dispose()
+ private void OnConnectionStateChange(object sender,
Connection.StateChangeEventArgs e)
{
- Close();
- }
-
- public void Close()
- {
- StopAsyncDelivery();
- if(messageQueue != null)
+ if(e.CurrentState == Connection.ConnectionState.Starting)
{
- messageQueue.Dispose();
- messageQueue = null;
+ if(listenerCount > 0)
+ {
+ StartAsyncDelivery();
+ }
}
- }
-
- protected virtual void StopAsyncDelivery()
- {
- if(asyncDelivery.CompareAndSet(true, false))
+ else if(e.CurrentState == Connection.ConnectionState.Stopping)
{
- if(null != asyncDeliveryThread)
+ if(listenerCount > 0)
{
- Tracer.Info("Stopping async delivery thread.");
- pause.Set();
- if(!asyncDeliveryThread.Join(10000))
- {
- Tracer.Info("Aborting async delivery thread.");
- asyncDeliveryThread.Abort();
- }
-
- asyncDeliveryThread = null;
- Tracer.Info("Async delivery thread stopped.");
+ StopAsyncDelivery();
}
}
}
+ private Thread asyncDeliveryThread = null;
+ private Atomic<bool> asyncDelivery = new Atomic<bool>(false);
+ TimeSpan dispatchingTimeout = new TimeSpan(5000);
+
protected virtual void StartAsyncDelivery()
{
if(asyncDelivery.CompareAndSet(false, true))
@@ -199,7 +154,7 @@ namespace Apache.NMS.MSMQ
{
try
{
- IMessage message = Receive();
+ IMessage message = Receive(dispatchingTimeout);
if(asyncDelivery.Value && message != null)
{
try
@@ -215,6 +170,7 @@ namespace Apache.NMS.MSMQ
catch(ThreadAbortException ex)
{
Tracer.InfoFormat("Thread abort received in thread: {0} :
{1}", this, ex.Message);
+
break;
}
catch(Exception ex)
@@ -225,11 +181,104 @@ namespace Apache.NMS.MSMQ
Tracer.Info("Stopping dispatcher thread consumer: " + this);
}
+ protected virtual void StopAsyncDelivery()
+ {
+ if(asyncDelivery.CompareAndSet(true, false))
+ {
+ if(null != asyncDeliveryThread)
+ {
+ // Thread.Interrupt and Thread.Abort do not interrupt
Receive
+ // instructions. Attempting to abort the thread and joining
+ // will result in a phantom backgroud thread, which may
+ // ultimately consume a message before actually stopping.
+
+ Tracer.Info("Waiting for thread to complete aborting.");
+ asyncDeliveryThread.Join(dispatchingTimeout);
+
+ asyncDeliveryThread = null;
+ Tracer.Info("Async delivery thread stopped.");
+ }
+ }
+ }
+
+
protected virtual void HandleAsyncException(Exception e)
{
session.Connection.HandleException(e);
}
+ #endregion
+
+ #region Receive (synchronous)
+
+ public IMessage Receive()
+ {
+ IMessage nmsMessage = null;
+
+ if(messageQueue != null)
+ {
+ nmsMessage = reader.Receive();
+ nmsMessage = TransformMessage(nmsMessage);
+ }
+
+ return nmsMessage;
+ }
+
+ public IMessage Receive(TimeSpan timeout)
+ {
+ IMessage nmsMessage = null;
+
+ if(messageQueue != null)
+ {
+ try
+ {
+ nmsMessage = reader.Receive(timeout);
+ }
+ catch(MessageQueueException ex)
+ {
+ if(ex.MessageQueueErrorCode !=
MessageQueueErrorCode.IOTimeout)
+ {
+ throw ex;
+ }
+ }
+ nmsMessage = TransformMessage(nmsMessage);
+ }
+
+ return nmsMessage;
+ }
+
+ public IMessage ReceiveNoWait()
+ {
+ return Receive(zeroTimeout);
+ }
+
+ #endregion
+
+ #region Close & dispose
+
+ public void Dispose()
+ {
+ Close();
+ }
+
+ public void Close()
+ {
+ if(listenerCount > 0)
+ {
+ session.Connection.ConnectionStateChange -=
OnConnectionStateChange;
+
+ StopAsyncDelivery();
+ }
+
+ if(messageQueue != null)
+ {
+ messageQueue.Dispose();
+ messageQueue = null;
+ }
+ }
+
+ #endregion
+
protected virtual IMessage TransformMessage(IMessage message)
{
IMessage transformed = message;
Modified:
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageProducer.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageProducer.cs?rev=1760211&r1=1760210&r2=1760211&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageProducer.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageProducer.cs
Sat Sep 10 23:09:20 2016
@@ -50,41 +50,47 @@ namespace Apache.NMS.MSMQ
this.destination = destination;
if(destination != null)
{
- messageQueue = openMessageQueue(destination);
+ messageQueue = OpenMessageQueue(destination);
}
}
- private MessageQueue openMessageQueue(Destination dest)
+ private MessageQueue OpenMessageQueue(Destination dest)
{
- MessageQueue rc = null;
- try
+ Queue queue = dest as Queue;
+
+ MessageQueue mq = queue.MSMQMessageQueue;
+
+ if(mq == null)
{
- if(!MessageQueue.Exists(dest.Path))
- {
- // create the new message queue and make it transactional
- rc = MessageQueue.Create(dest.Path, session.Transacted);
- this.destination.Path = rc.Path;
- }
- else
+ try
{
- rc = new MessageQueue(dest.Path);
- this.destination.Path = rc.Path;
- if(!rc.CanWrite)
+ if(!Queue.Exists(dest.Path))
{
- throw new NMSSecurityException("Do not have write
access to: " + dest);
+ // create the new message queue and make it
transactional
+ mq = MessageQueue.Create(dest.Path,
session.Transacted);
+ this.destination = new Queue(mq);
+ }
+ else
+ {
+ mq = new MessageQueue(dest.Path);
+ this.destination = new Queue(mq);
+ if(!mq.CanWrite)
+ {
+ throw new NMSSecurityException("Do not have write
access to: " + dest);
+ }
}
}
- }
- catch(Exception e)
- {
- if(rc != null)
+ catch(Exception e)
{
- rc.Dispose();
+ if(mq != null)
+ {
+ mq.Dispose();
+ }
+
+ throw new NMSException(e.Message + ": " + dest, e);
}
-
- throw new NMSException(e.Message + ": " + dest, e);
}
- return rc;
+ return mq;
}
public void Send(IMessage message)
@@ -111,18 +117,28 @@ namespace Apache.NMS.MSMQ
// Locate the MSMQ Queue we will be sending to
if(messageQueue != null)
{
+ if(destination == null)
+ {
+ throw new InvalidDestinationException("This producer
can only be used to send to: " + destination);
+ }
+
if(destination.Equals(this.destination))
{
mq = messageQueue;
}
else
{
- throw new NMSException("This producer can only be used
to send to: " + destination);
+ throw new NotSupportedException("This producer can
only be used to send to: " + destination);
}
}
else
{
- mq = openMessageQueue((Destination) destination);
+ if(destination == null)
+ {
+ throw new NotSupportedException();
+ }
+
+ mq = OpenMessageQueue((Destination) destination);
}
if(this.ProducerTransformer != null)
@@ -147,7 +163,7 @@ namespace Apache.NMS.MSMQ
// TODO: message.NMSMessageId =
}
- // Convert the Mesasge into a MSMQ message
+ // Convert the Message into a MSMQ message
Message msg = session.MessageConverter.ToMsmqMessage(message);
if(mq.Transactional)
@@ -179,6 +195,11 @@ namespace Apache.NMS.MSMQ
mq.Send(msg);
}
+ message.NMSMessageId = msg.Id;
+ if(message.NMSCorrelationID == null)
+ {
+ message.NMSCorrelationID = msg.CorrelationId;
+ }
}
finally
{
Modified:
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Queue.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Queue.cs?rev=1760211&r1=1760210&r2=1760211&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Queue.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Queue.cs Sat
Sep 10 23:09:20 2016
@@ -15,6 +15,7 @@
* limitations under the License.
*/
using System;
+using System.Messaging;
namespace Apache.NMS.MSMQ
{
@@ -24,17 +25,69 @@ namespace Apache.NMS.MSMQ
/// </summary>
public class Queue : Destination, IQueue
{
-
public Queue()
: base()
{
}
+ public Queue(MessageQueue messageQueue)
+ : base()
+ {
+ this.messageQueue = messageQueue;
+ Path = messageQueue.Path;
+ }
+
public Queue(String name)
: base(name)
{
+ if(string.IsNullOrEmpty(name))
+ {
+ messageQueue = null;
+ }
+ else
+ {
+ try
+ {
+ messageQueue = new MessageQueue(name);
+ }
+ catch(Exception /*ex*/)
+ {
+ // Excerpt from Microsoft documentation for
MessageQueue.Exists :
+ //
(@https://msdn.microsoft.com/fr-fr/library/system.messaging.messagequeue.exists(v=vs.110).aspx)
+ // Exists(String) is an expensive operation. Use it only
when it is necessary within the application.
+ // ---
+ // Therefore, we won't check for existence of the queue
before attempting to access it.
+
+ //if(!Exists(name))
+ //{
+ // Excerpt from the Oracle JMS JavaDoc for
Session.createQueue :
+ //
(@https://docs.oracle.com/javaee/7/api/javax/jms/Session.html#createQueue-java.lang.String-)
+ // Note that this method simply creates an object that
encapsulates the name of a queue. It does
+ // not create the physical queue in the JMS provider.
JMS does not provide a method to create the
+ // physical queue, since this would be specific to a
given JMS provider. Creating a physical queue
+ // is provider-specific and is typically an
administrative task performed by an administrator,
+ // though some providers may create them automatically
when needed. The one exception to this is
+ // the creation of a temporary queue, which is done
using the createTemporaryQueue method.
+ // ---
+ // Therefore, we should throw an NMSException if the
queue does not exist.
+ // ---
+ // BUT, to keep it compatible with the initial
implementation of MessageProducer, which attempts
+ // to create non pre-existing queues, we keep it
silent..
+
+ // throw new NMSException("Message queue \"" + name +
"\" does not exist", ex);
+ //}
+
+ // throw new NMSException("Cannot access message queue \""
+ name + "\"", ex);
+ }
+ }
}
+ private MessageQueue messageQueue;
+ public MessageQueue MSMQMessageQueue
+ {
+ get { return messageQueue; }
+ }
+
override public DestinationType DestinationType
{
get
@@ -54,7 +107,25 @@ namespace Apache.NMS.MSMQ
return new Queue(name);
}
-
+ public static bool Exists(string name)
+ {
+ try
+ {
+ return MessageQueue.Exists(name);
+ }
+ catch(InvalidOperationException)
+ {
+ // Excerpt from Microsoft documentation for
MessageQueue.Exists :
+ //
(@https://msdn.microsoft.com/fr-fr/library/system.messaging.messagequeue.exists(v=vs.110).aspx)
+ // InvalidOperationException: The application used format name
syntax when verifying queue existence.
+ // ---
+ // The Exists(String) method does not support the FormatName
prefix.
+ // No method exists to determine whether a queue with a
specified format name exists.
+
+ // We'll assume the queue exists at this point
+ return true;
+ }
+ }
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/QueueBrowser.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/QueueBrowser.cs?rev=1760211&r1=1760210&r2=1760211&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/QueueBrowser.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/QueueBrowser.cs
Sat Sep 10 23:09:20 2016
@@ -1,164 +1,172 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections;
-using System.Messaging;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Apache.NMS.MSMQ.Readers;
-
-namespace Apache.NMS.MSMQ
-{
- public class QueueBrowser : Apache.NMS.IQueueBrowser
- {
- private bool closed = false;
- private bool disposed = false;
-
- private readonly Session session;
- private MessageQueue messageQueue;
-
- private string selector;
-
- private IMessageReader reader;
-
- public QueueBrowser(Session session, MessageQueue messageQueue)
- : this(session, messageQueue, null)
- {
- }
-
- public QueueBrowser(Session session, MessageQueue messageQueue,
- string selector)
- {
- this.session = session;
- this.messageQueue = messageQueue;
- if(null != this.messageQueue)
- {
- this.messageQueue.MessageReadPropertyFilter.SetAll();
- }
-
- reader = MessageReaderUtil.CreateMessageReader(
- messageQueue, session.MessageConverter, selector);
- }
-
- ~QueueBrowser()
- {
- Dispose(false);
- }
-
- #region IDisposable Members
-
- ///<summary>
- /// Performs application-defined tasks associated with freeing,
releasing, or resetting unmanaged resources.
- ///</summary>
- public void Dispose()
- {
- Dispose(true);
- GC.SuppressFinalize(this);
- }
-
- protected void Dispose(bool disposing)
- {
- if(disposed)
- {
- return;
- }
-
- if(disposing)
- {
- // Dispose managed code here.
- }
-
- try
- {
- Close();
- }
- catch
- {
- // Ignore errors.
- }
-
- disposed = true;
- }
-
- #endregion
-
- public void Close()
- {
- if(messageQueue != null)
- {
- messageQueue.Dispose();
- messageQueue = null;
- }
- closed = true;
- }
-
- public string MessageSelector
- {
- get { return selector; }
- }
-
- public IQueue Queue
- {
- get { return new Queue(this.messageQueue.Path); }
- }
-
- internal class Enumerator : IEnumerator
- {
- private readonly Session session;
- private readonly MessageEnumerator innerEnumerator;
- private readonly IMessageReader reader;
-
- public Enumerator(Session session, MessageQueue
messageQueue,
- IMessageReader reader)
- {
- this.session = session;
- this.innerEnumerator =
messageQueue.GetMessageEnumerator2();
- this.reader = reader;
- }
-
- public object Current
- {
- get
- {
- return
this.session.MessageConverter.ToNmsMessage(this.innerEnumerator.Current);
- }
- }
-
- public bool MoveNext()
- {
- while(this.innerEnumerator.MoveNext())
- {
-
if(reader.Matches(this.innerEnumerator.Current))
- {
- return true;
- }
- }
- return false;
- }
-
- public void Reset()
- {
- this.innerEnumerator.Reset();
- }
- }
-
- public IEnumerator GetEnumerator()
- {
- return new Enumerator(this.session, this.messageQueue,
this.reader);
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections;
+using System.Messaging;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Apache.NMS.MSMQ.Readers;
+
+namespace Apache.NMS.MSMQ
+{
+ public class QueueBrowser : Apache.NMS.IQueueBrowser
+ {
+ private bool disposed = false;
+
+ private readonly Session session;
+ private MessageQueue messageQueue;
+
+ private string selector;
+
+ private IMessageReader reader;
+
+ public QueueBrowser(Session session, MessageQueue messageQueue)
+ : this(session, messageQueue, null)
+ {
+ }
+
+ public QueueBrowser(Session session, MessageQueue messageQueue,
+ string selector)
+ {
+ this.session = session;
+ this.messageQueue = messageQueue;
+ if(null != this.messageQueue)
+ {
+ this.messageQueue.MessageReadPropertyFilter.SetAll();
+ }
+ this.selector = selector;
+
+ reader = MessageReaderUtil.CreateMessageReader(
+ messageQueue, session.MessageConverter, selector);
+ }
+
+ ~QueueBrowser()
+ {
+ Dispose(false);
+ }
+
+ #region IDisposable Members
+
+ ///<summary>
+ /// Performs application-defined tasks associated with freeing,
releasing, or resetting unmanaged resources.
+ ///</summary>
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected void Dispose(bool disposing)
+ {
+ if(disposed)
+ {
+ return;
+ }
+
+ if(disposing)
+ {
+ // Dispose managed code here.
+ }
+
+ try
+ {
+ Close();
+ }
+ catch
+ {
+ // Ignore errors.
+ }
+
+ disposed = true;
+ }
+
+ #endregion
+
+ public void Close()
+ {
+ if(messageQueue != null)
+ {
+ messageQueue.Dispose();
+ messageQueue = null;
+ }
+ }
+
+ public string MessageSelector
+ {
+ get { return selector; }
+ }
+
+ public IQueue Queue
+ {
+ get { return new Queue(this.messageQueue.Path); }
+ }
+
+ internal class Enumerator : IEnumerator, IDisposable
+ {
+ private Session session;
+ private MessageEnumerator innerEnumerator;
+ private IMessageReader reader;
+
+ public Enumerator(Session session, MessageQueue
messageQueue,
+ IMessageReader reader)
+ {
+ this.session = session;
+ this.innerEnumerator =
messageQueue.GetMessageEnumerator2();
+ this.reader = reader;
+ }
+
+ public object Current
+ {
+ get
+ {
+ return
this.session.MessageConverter.ToNmsMessage(this.innerEnumerator.Current);
+ }
+ }
+
+ public bool MoveNext()
+ {
+ while(this.innerEnumerator.MoveNext())
+ {
+
if(reader.Matches(this.innerEnumerator.Current))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public void Reset()
+ {
+ this.innerEnumerator.Reset();
+ }
+
+ public void Dispose()
+ {
+ if(innerEnumerator != null)
+ {
+ innerEnumerator.Close();
+ innerEnumerator = null;
+ }
+ }
+ }
+
+ public IEnumerator GetEnumerator()
+ {
+ return new Enumerator(this.session, this.messageQueue,
this.reader);
+ }
+ }
+}
Modified:
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/AbstractMessageReader.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/AbstractMessageReader.cs?rev=1760211&r1=1760210&r2=1760211&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/AbstractMessageReader.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/AbstractMessageReader.cs
Sat Sep 10 23:09:20 2016
@@ -1,126 +1,126 @@
-using System;
-using System.Messaging;
-using Apache.NMS.MSMQ;
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.NMS.MSMQ.Readers
-{
- /// <summary>
- /// Abstract MSMQ message reader. Derived classes support various
- /// message filtering methods.
- /// </summary>
- public abstract class AbstractMessageReader : IMessageReader
- {
- protected MessageQueue messageQueue;
- protected IMessageConverter messageConverter;
- protected IMessageConverterEx messageConverterEx;
-
- /// <summary>
- /// Constructor.
- /// </summary>
- /// <param name="messageQueue">The MSMQ message queue from which
- /// messages will be read.</param>
- /// <param name="messageConverter">A message converter for mapping
- /// MSMQ messages to NMS messages.</param>
- public AbstractMessageReader(MessageQueue messageQueue,
- IMessageConverter messageConverter)
- {
- this.messageQueue = messageQueue;
-
- this.messageConverter = messageConverter;
- this.messageConverterEx = (messageConverter as
IMessageConverterEx);
- }
-
- /// <summary>
- /// Returns without removing (peeks) the first message in the queue
- /// referenced by this MessageQueue matching the selection criteria.
- /// The Peek method is synchronous, so it blocks the current thread
- /// until a message becomes available.
- /// </summary>
- /// <returns>Peeked message.</returns>
- public abstract IMessage Peek();
-
- /// <summary>
- /// Returns without removing (peeks) the first message in the queue
- /// referenced by this MessageQueue matching the selection criteria.
- /// The Peek method is synchronous, so it blocks the current thread
- /// until a message becomes available or the specified time-out occurs.
- /// </summary>
- /// <param name="timeSpan">Reception time-out.</param>
- /// <returns>Peeked message.</returns>
- public abstract IMessage Peek(TimeSpan timeSpan);
-
- /// <summary>
- /// Receives the first message available in the queue referenced by
- /// the MessageQueue matching the selection criteria.
- /// This call is synchronous, and blocks the current thread of
execution
- /// until a message is available.
- /// </summary>
- /// <returns>Received message.</returns>
- public abstract IMessage Receive();
-
- /// <summary>
- /// Receives the first message available in the queue referenced by the
- /// MessageQueue matching the selection criteria, and waits until
either
- /// a message is available in the queue, or the time-out expires.
- /// </summary>
- /// <param name="timeSpan">Reception time-out.</param>
- /// <returns>Received message.</returns>
- public abstract IMessage Receive(TimeSpan timeSpan);
-
- /// <summary>
- /// Receives the first message available in the transactional queue
- /// referenced by the MessageQueue matching the selection criteria.
- /// This call is synchronous, and blocks the current thread of
execution
- /// until a message is available.
- /// </summary>
- /// <param name="transaction">Transaction.</param>
- /// <returns>Received message.</returns>
- public abstract IMessage Receive(MessageQueueTransaction transaction);
-
- /// <summary>
- /// Receives the first message available in the transactional queue
- /// referenced by the MessageQueue matching the selection criteria,
- /// and waits until either a message is available in the queue, or the
- /// time-out expires.
- /// </summary>
- /// <param name="timeSpan">Reception time-out.</param>
- /// <param name="transaction">Transaction.</param>
- /// <returns>Received message.</returns>
- public abstract IMessage Receive(TimeSpan timeSpan,
- MessageQueueTransaction transaction);
-
- /// <summary>
- /// Checks if an MSMQ message matches the selection criteria.
- /// </summary>
- /// <param name="message">MSMQ message.</param>
- /// <return>true if the message matches the selection
criteria.</return>
- public abstract bool Matches(Message message);
-
- /// <summary>
- /// Converts an MSMQ message to an NMS message, using the converter
- /// specified at construction time.
- /// </summary>
- /// <param name="message">MSMQ message.</param>
- /// <return>NMS message.</return>
- protected IMessage Convert(Message message)
- {
- return message == null ? null :
messageConverter.ToNmsMessage(message);
- }
- }
-}
+using System;
+using System.Messaging;
+using Apache.NMS.MSMQ;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MSMQ.Readers
+{
+ /// <summary>
+ /// Abstract MSMQ message reader. Derived classes support various
+ /// message filtering methods.
+ /// </summary>
+ public abstract class AbstractMessageReader : IMessageReader
+ {
+ protected MessageQueue messageQueue;
+ protected IMessageConverter messageConverter;
+ protected IMessageConverterEx messageConverterEx;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="messageQueue">The MSMQ message queue from which
+ /// messages will be read.</param>
+ /// <param name="messageConverter">A message converter for mapping
+ /// MSMQ messages to NMS messages.</param>
+ public AbstractMessageReader(MessageQueue messageQueue,
+ IMessageConverter messageConverter)
+ {
+ this.messageQueue = messageQueue;
+
+ this.messageConverter = messageConverter;
+ this.messageConverterEx = (messageConverter as
IMessageConverterEx);
+ }
+
+ /// <summary>
+ /// Returns without removing (peeks) the first message in the queue
+ /// referenced by this MessageQueue matching the selection criteria.
+ /// The Peek method is synchronous, so it blocks the current thread
+ /// until a message becomes available.
+ /// </summary>
+ /// <returns>Peeked message.</returns>
+ public abstract IMessage Peek();
+
+ /// <summary>
+ /// Returns without removing (peeks) the first message in the queue
+ /// referenced by this MessageQueue matching the selection criteria.
+ /// The Peek method is synchronous, so it blocks the current thread
+ /// until a message becomes available or the specified time-out occurs.
+ /// </summary>
+ /// <param name="timeSpan">Reception time-out.</param>
+ /// <returns>Peeked message.</returns>
+ public abstract IMessage Peek(TimeSpan timeSpan);
+
+ /// <summary>
+ /// Receives the first message available in the queue referenced by
+ /// the MessageQueue matching the selection criteria.
+ /// This call is synchronous, and blocks the current thread of
execution
+ /// until a message is available.
+ /// </summary>
+ /// <returns>Received message.</returns>
+ public abstract IMessage Receive();
+
+ /// <summary>
+ /// Receives the first message available in the queue referenced by the
+ /// MessageQueue matching the selection criteria, and waits until
either
+ /// a message is available in the queue, or the time-out expires.
+ /// </summary>
+ /// <param name="timeSpan">Reception time-out.</param>
+ /// <returns>Received message.</returns>
+ public abstract IMessage Receive(TimeSpan timeSpan);
+
+ /// <summary>
+ /// Receives the first message available in the transactional queue
+ /// referenced by the MessageQueue matching the selection criteria.
+ /// This call is synchronous, and blocks the current thread of
execution
+ /// until a message is available.
+ /// </summary>
+ /// <param name="transaction">Transaction.</param>
+ /// <returns>Received message.</returns>
+ public abstract IMessage Receive(MessageQueueTransaction transaction);
+
+ /// <summary>
+ /// Receives the first message available in the transactional queue
+ /// referenced by the MessageQueue matching the selection criteria,
+ /// and waits until either a message is available in the queue, or the
+ /// time-out expires.
+ /// </summary>
+ /// <param name="timeSpan">Reception time-out.</param>
+ /// <param name="transaction">Transaction.</param>
+ /// <returns>Received message.</returns>
+ public abstract IMessage Receive(TimeSpan timeSpan,
+ MessageQueueTransaction transaction);
+
+ /// <summary>
+ /// Checks if an MSMQ message matches the selection criteria.
+ /// </summary>
+ /// <param name="message">MSMQ message.</param>
+ /// <return>true if the message matches the selection
criteria.</return>
+ public abstract bool Matches(Message message);
+
+ /// <summary>
+ /// Converts an MSMQ message to an NMS message, using the converter
+ /// specified at construction time.
+ /// </summary>
+ /// <param name="message">MSMQ message.</param>
+ /// <return>NMS message.</return>
+ protected IMessage Convert(Message message)
+ {
+ return message == null ? null :
messageConverter.ToNmsMessage(message);
+ }
+ }
+}