Author: jgomes Date: Sat Sep 10 23:09:20 2016 New Revision: 1760211 URL: http://svn.apache.org/viewvc?rev=1760211&view=rev Log: Apply patch for AMQNET-556 from Stephane Ramet. Thanks Stephane!
Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/msmqprovider-test.config activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/AsyncConsumeTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/BadConsumeTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/BytesMessageTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/BytesMessage.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/Destination.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/MapMessage.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/Message.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/ObjectMessage.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/Queue.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/StreamMessage.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempDestination.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempQueue.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempTopic.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TextMessage.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/Topic.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/ConnectionTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/ConsumerTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/DurableTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/EndianBinaryReaderTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/EndianBinaryWriterTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/EndianTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/ForeignMessageTransformationTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQAsyncConsumeTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQBadConsumeTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQBytesMessageTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQConnectionTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQConsumerTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQDurableTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQForeignMessageTransformationTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQMapMessageTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQMessageSelectorTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQMessageTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQMessageTransformerTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQNMSPropertyTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQProducerTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQRequestResponseTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQStreamMessageTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQTempDestinationDeletionTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQTempDestinationTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQTestSupport.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQTextMessageTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQTransactionTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MSMQXmlMessageTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MapMessageTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MessageSelectorTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MessageTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/MessageTransformerTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/NMSPropertyTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/NMSTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/NMSTestSupport.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/NMSTracer.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/PrimitiveMapTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/ProducerTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/RedeliveryPolicyTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/RequestResponseTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/StreamMessageTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/TempDestinationDeletionTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/TempDestinationTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/TextMessageTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/TransactionTest.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/XmlMessageTest.cs Modified: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/Apache.NMS.MSMQ.Test.nunit activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/nant.build activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Connection.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/ConnectionMetaData.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/DefaultMessageConverter.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/IMessageConverterEx.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageConsumer.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageProducer.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Queue.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/QueueBrowser.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/AbstractMessageReader.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/ByCorrelationIdMessageReader.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/ByIdMessageReader.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/ByLookupIdMessageReader.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/BySelectorMessageReader.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/IMessageReader.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/MessageReaderUtil.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/NonFilteringMessageReader.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/ANDExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/AlignedNumericValues.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/ArithmeticExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/BinaryExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/BooleanCastExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/BooleanConstantExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/BooleanUnaryExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/ComparisonExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/ConstantExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/DivideExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/EqualExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/GreaterExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/GreaterOrEqualExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/IBooleanExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/IExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/InExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/IsNullExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/LesserExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/LesserOrEqualExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/LikeExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/LogicExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/MessageEvaluationContext.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/MinusExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/ModExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/MultiplyExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/NOTExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/NegateExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/ORExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/ParseException.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/PlusExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/PropertyExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/SelectorParser.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/SelectorParser.csc activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/SelectorParserConstants.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/SelectorParserTokenManager.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/SimpleCharStream.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/Token.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/TokenMgrError.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/UnaryExpression.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Session.cs activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/vs2008-msmq-test.csproj activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/vs2008-msmq.csproj activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/vs2008-msmq.sln Modified: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/Apache.NMS.MSMQ.Test.nunit URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/Apache.NMS.MSMQ.Test.nunit?rev=1760211&r1=1760210&r2=1760211&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/Apache.NMS.MSMQ.Test.nunit (original) +++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/Apache.NMS.MSMQ.Test.nunit Sat Sep 10 23:09:20 2016 @@ -1,7 +1,7 @@ -<NUnitProject> - <Settings activeconfig="Default" /> - <Config name="Default" binpathtype="Auto"> - <assembly path="Apache.NMS.Test.dll" /> - <assembly path="Apache.NMS.MSMQ.Test.dll" /> - </Config> +<NUnitProject> + <Settings activeconfig="Default" /> + <Config name="Default" binpathtype="Auto"> + <!--<assembly path="Apache.NMS.Test.dll" />--> + <assembly path="Apache.NMS.MSMQ.Test.dll" /> + </Config> </NUnitProject> \ No newline at end of file Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/msmqprovider-test.config URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/msmqprovider-test.config?rev=1760211&view=auto ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/msmqprovider-test.config (added) +++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/msmqprovider-test.config Sat Sep 10 23:09:20 2016 @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="utf-8" ?> +<!-- +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +--> +<configuration> + <testURI value="msmq://localhost"> + <userName value="guest"/> + <passWord value="guest"/> + <defaultTestQueue value="queue://.\Private$\TestQ"/> + <defaultTestTopic value=""/> + <defaultTestQueue2 value="queue://.\Private$\TestQ2"/> + <defaultTestTopic2 value=""/> + <durableConsumerTestTopic value=""/> + <messageSelectorTestQueue value="queue://.\Private$\TestQ"/> + <messageSelectorTestTopic value=""/> + <deletionTestQueue value="queue://.\Private$\TestQ"/> + <deletionTestTopic value=""/> + <deletionTestTempQueue value=""/> + <deletionTestTempTopic value=""/> + <transactionTestQueue value="queue://.\Private$\TestQT"/> + </testURI> +</configuration> Modified: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/nant.build URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/nant.build?rev=1760211&r1=1760210&r2=1760211&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/nant.build (original) +++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/nant.build Sat Sep 10 23:09:20 2016 @@ -1,4 +1,4 @@ -<?xml version="1.0"?> +<?xml version="1.0"?> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with @@ -31,8 +31,6 @@ <property name="nunit.dll" value="${basedir}/lib/NUnit/${current.build.framework}/nunit.framework.dll" dynamic="true" /> <property name="Apache.NMS.dll" value="${basedir}/lib/Apache.NMS/${current.build.framework}/Apache.NMS.dll" dynamic="true" /> <property name="Apache.NMS.pdb" value="${basedir}/lib/Apache.NMS/${current.build.framework}/Apache.NMS.pdb" dynamic="true" /> - <property name="Apache.NMS.Test.dll" value="${basedir}/lib/Apache.NMS/${current.build.framework}//Apache.NMS.Test.dll" dynamic="true" /> - <property name="Apache.NMS.Test.pdb" value="${basedir}/lib/Apache.NMS/${current.build.framework}/Apache.NMS.Test.pdb" dynamic="true" /> <property name="NUnit.Projectfile" value="Apache.NMS.MSMQ.Test.nunit" /> <!-- Skip certain frameworks, since MSMQ is not supported on those platforms. --> @@ -77,7 +75,6 @@ <include name="${current.build.framework.assembly.dir}/System.Xml.dll" /> <include name="${current.build.framework.assembly.dir}/System.Messaging.dll" /> <include name="${Apache.NMS.dll}" /> - <include name="${Apache.NMS.Test.dll}" /> <include name="${build.bin.dir}/${project.name}.dll" /> <include name="${nunit.dll}" /> </assemblyfileset> @@ -88,8 +85,6 @@ <include name="nmsprovider-*.config" /> <include name="${Apache.NMS.dll}" /> <include name="${Apache.NMS.pdb}" /> - <include name="${Apache.NMS.Test.dll}" /> - <include name="${Apache.NMS.Test.pdb}" /> <include name="${nunit.dll}" /> <include name="${NUnit.Projectfile}" /> </fileset> Modified: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Connection.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Connection.cs?rev=1760211&r1=1760210&r2=1760211&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Connection.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Connection.cs Sat Sep 10 23:09:20 2016 @@ -16,6 +16,7 @@ */ using System; +using System.Threading; namespace Apache.NMS.MSMQ { @@ -26,21 +27,101 @@ namespace Apache.NMS.MSMQ /// public class Connection : IConnection { - private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge; - private IMessageConverter messageConverter = new DefaultMessageConverter(); + #region Constructors - private IRedeliveryPolicy redeliveryPolicy; - private ConnectionMetaData metaData = null; - private bool connected; - private bool closed; - private string clientId; + public Connection() + { + // now lets send the connection and see if we get an ack/nak + // TODO: establish a connection + } + + #endregion + + #region Connection state + + public enum ConnectionState + { + Created, + Connected, + Starting, + Started, + Stopping, + Stopped, + Closed + } + + private ConnectionState state = ConnectionState.Created; + + public class StateChangeEventArgs : EventArgs + { + public StateChangeEventArgs(ConnectionState originalState, + ConnectionState currentState) + { + this.originalState = originalState; + this.currentState = currentState; + } + + private ConnectionState originalState; + public ConnectionState OriginalState + { + get { return originalState; } + } + + private ConnectionState currentState; + public ConnectionState CurrentState + { + get { return currentState; } + } + } + + public delegate void StateChangeEventHandler(object sender, StateChangeEventArgs e); + + public event StateChangeEventHandler ConnectionStateChange; + + private void ChangeState(ConnectionState newState) + { + if(ConnectionStateChange != null) + { + ConnectionStateChange(this, + new StateChangeEventArgs(this.state, newState)); + } + + this.state = newState; + } + + private object stateLock = new object(); + + #endregion + + #region Start & stop /// <summary> /// Starts message delivery for this connection. /// </summary> public void Start() { - CheckConnected(); + lock(stateLock) + { + switch(state) + { + case ConnectionState.Created: + case ConnectionState.Connected: + case ConnectionState.Stopped: + ChangeState(ConnectionState.Starting); + ChangeState(ConnectionState.Started); + break; + + case ConnectionState.Stopping: + throw new NMSException("Connection stopping"); + + case ConnectionState.Closed: + throw new NMSException("Connection closed"); + + case ConnectionState.Starting: + case ConnectionState.Started: + break; + } + } } /// <summary> @@ -49,7 +130,7 @@ namespace Apache.NMS.MSMQ /// </summary> public bool IsStarted { - get { return true; } + get { return state == ConnectionState.Started; } } /// <summary> @@ -57,9 +138,65 @@ namespace Apache.NMS.MSMQ /// </summary> public void Stop() { - CheckConnected(); + lock(stateLock) + { + switch(state) + { + case ConnectionState.Started: + ChangeState(ConnectionState.Stopping); + ChangeState(ConnectionState.Stopped); + break; + + case ConnectionState.Starting: + throw new NMSException("Connection starting"); + + case ConnectionState.Closed: + throw new NMSException("Connection closed"); + + case ConnectionState.Created: + case ConnectionState.Connected: + case ConnectionState.Stopping: + case ConnectionState.Stopped: + break; + } + } + } + + #endregion + + #region Close & dispose + + public void Close() + { + if(!IsClosed) + { + Stop(); + + state = ConnectionState.Closed; + } + } + + public bool IsClosed + { + get { return state == ConnectionState.Closed; } } + public void Dispose() + { + try + { + Close(); + } + catch + { + state = ConnectionState.Closed; + } + } + + #endregion + + #region Create session + /// <summary> /// Creates a new session to work on this connection /// </summary> @@ -73,14 +210,16 @@ namespace Apache.NMS.MSMQ /// </summary> public ISession CreateSession(AcknowledgementMode mode) { - CheckConnected(); + if(IsClosed) + { + throw new NMSException("Connection closed"); + } return new Session(this, mode); } - public void Dispose() - { - closed = true; - } + #endregion + + #region Connection properties /// <summary> /// The default timeout for network requests. @@ -91,24 +230,27 @@ namespace Apache.NMS.MSMQ set { } } + private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge; public AcknowledgementMode AcknowledgementMode { get { return acknowledgementMode; } set { acknowledgementMode = value; } } + private IMessageConverter messageConverter = new DefaultMessageConverter(); public IMessageConverter MessageConverter { get { return messageConverter; } set { messageConverter = value; } } + private string clientId; public string ClientId { get { return clientId; } set { - if(connected) + if(state != ConnectionState.Created) { throw new NMSException("You cannot change the ClientId once the Connection is connected"); } @@ -116,6 +258,7 @@ namespace Apache.NMS.MSMQ } } + private IRedeliveryPolicy redeliveryPolicy; /// <summary> /// Get/or set the redelivery policy for this connection. /// </summary> @@ -125,6 +268,19 @@ namespace Apache.NMS.MSMQ set { this.redeliveryPolicy = value; } } + private ConnectionMetaData metaData = null; + /// <summary> + /// Gets the Meta Data for the NMS Connection instance. + /// </summary> + public IConnectionMetaData MetaData + { + get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); } + } + + #endregion + + #region Transformer delegates + private ConsumerTransformerDelegate consumerTransformer; public ConsumerTransformerDelegate ConsumerTransformer { @@ -139,57 +295,18 @@ namespace Apache.NMS.MSMQ set { this.producerTransformer = value; } } - /// <summary> - /// Gets the Meta Data for the NMS Connection instance. - /// </summary> - public IConnectionMetaData MetaData - { - get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); } - } + #endregion + + #region Exception & transport listeners /// <summary> /// A delegate that can receive transport level exceptions. /// </summary> public event ExceptionListener ExceptionListener; - /// <summary> - /// An asynchronous listener that is notified when a Fault tolerant connection - /// has been interrupted. - /// </summary> - public event ConnectionInterruptedListener ConnectionInterruptedListener; - - /// <summary> - /// An asynchronous listener that is notified when a Fault tolerant connection - /// has been resumed. - /// </summary> - public event ConnectionResumedListener ConnectionResumedListener; - - protected void CheckConnected() - { - if(closed) - { - throw new NMSException("Connection Closed"); - } - if(!connected) - { - connected = true; - // now lets send the connection and see if we get an ack/nak - // TODO: establish a connection - } - } - - public void Close() - { - Dispose(); - } - - public void PurgeTempDestinations() - { - } - public void HandleException(Exception e) { - if(ExceptionListener != null && !this.closed) + if(ExceptionListener != null && !this.IsClosed) { ExceptionListener(e); } @@ -199,11 +316,17 @@ namespace Apache.NMS.MSMQ } } + /// <summary> + /// An asynchronous listener that is notified when a Fault tolerant connection + /// has been interrupted. + /// </summary> + public event ConnectionInterruptedListener ConnectionInterruptedListener; + public void HandleTransportInterrupted() { Tracer.Debug("Transport has been Interrupted."); - if(this.ConnectionInterruptedListener != null && !this.closed) + if(this.ConnectionInterruptedListener != null && !this.IsClosed) { try { @@ -215,11 +338,17 @@ namespace Apache.NMS.MSMQ } } + /// <summary> + /// An asynchronous listener that is notified when a Fault tolerant connection + /// has been resumed. + /// </summary> + public event ConnectionResumedListener ConnectionResumedListener; + public void HandleTransportResumed() { Tracer.Debug("Transport has resumed normal operation."); - if(this.ConnectionResumedListener != null && !this.closed) + if(this.ConnectionResumedListener != null && !this.IsClosed) { try { @@ -230,5 +359,11 @@ namespace Apache.NMS.MSMQ } } } + + #endregion + + public void PurgeTempDestinations() + { + } } } Modified: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/ConnectionMetaData.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/ConnectionMetaData.cs?rev=1760211&r1=1760210&r2=1760211&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/ConnectionMetaData.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/ConnectionMetaData.cs Sat Sep 10 23:09:20 2016 @@ -1,107 +1,107 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using System; -using System.Reflection; - -namespace Apache.NMS.MSMQ -{ - /// <summary> - /// Implements the Connection Meta-Data feature for Apache.NMS.MSMQ - /// </summary> - public class ConnectionMetaData : IConnectionMetaData - { - private int nmsMajorVersion; - private int nmsMinorVersion; - - private string nmsProviderName; - private string nmsVersion; - - private int providerMajorVersion; - private int providerMinorVersion; - private string providerVersion; - - private string[] nmsxProperties; - - public ConnectionMetaData() - { - Assembly self = Assembly.GetExecutingAssembly(); - AssemblyName asmName = self.GetName(); - - this.nmsProviderName = asmName.Name; - this.providerMajorVersion = asmName.Version.Major; - this.providerMinorVersion = asmName.Version.Minor; - this.providerVersion = asmName.Version.ToString(); - - this.nmsxProperties = new String[] { }; - - foreach(AssemblyName name in self.GetReferencedAssemblies()) - { - if(0 == string.Compare(name.Name, "Apache.NMS", true)) - { - this.nmsMajorVersion = name.Version.Major; - this.nmsMinorVersion = name.Version.Minor; - this.nmsVersion = name.Version.ToString(); - - return; - } - } - - throw new NMSException("Could not find a reference to the Apache.NMS Assembly."); - } - - public int NMSMajorVersion - { - get { return this.nmsMajorVersion; } - } - - public int NMSMinorVersion - { - get { return this.nmsMinorVersion; } - } - - public string NMSProviderName - { - get { return this.nmsProviderName; } - } - - public string NMSVersion - { - get { return this.nmsVersion; } - } - - public string[] NMSXPropertyNames - { - get { return this.nmsxProperties; } - } - - public int ProviderMajorVersion - { - get { return this.providerMajorVersion; } - } - - public int ProviderMinorVersion - { - get { return this.providerMinorVersion; } - } - - public string ProviderVersion - { - get { return this.providerVersion; } - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Reflection; + +namespace Apache.NMS.MSMQ +{ + /// <summary> + /// Implements the Connection Meta-Data feature for Apache.NMS.MSMQ + /// </summary> + public class ConnectionMetaData : IConnectionMetaData + { + private int nmsMajorVersion; + private int nmsMinorVersion; + + private string nmsProviderName; + private string nmsVersion; + + private int providerMajorVersion; + private int providerMinorVersion; + private string providerVersion; + + private string[] nmsxProperties; + + public ConnectionMetaData() + { + Assembly self = Assembly.GetExecutingAssembly(); + AssemblyName asmName = self.GetName(); + + this.nmsProviderName = asmName.Name; + this.providerMajorVersion = asmName.Version.Major; + this.providerMinorVersion = asmName.Version.Minor; + this.providerVersion = asmName.Version.ToString(); + + this.nmsxProperties = new String[] { }; + + foreach(AssemblyName name in self.GetReferencedAssemblies()) + { + if(0 == string.Compare(name.Name, "Apache.NMS", true)) + { + this.nmsMajorVersion = name.Version.Major; + this.nmsMinorVersion = name.Version.Minor; + this.nmsVersion = name.Version.ToString(); + + return; + } + } + + throw new NMSException("Could not find a reference to the Apache.NMS Assembly."); + } + + public int NMSMajorVersion + { + get { return this.nmsMajorVersion; } + } + + public int NMSMinorVersion + { + get { return this.nmsMinorVersion; } + } + + public string NMSProviderName + { + get { return this.nmsProviderName; } + } + + public string NMSVersion + { + get { return this.nmsVersion; } + } + + public string[] NMSXPropertyNames + { + get { return this.nmsxProperties; } + } + + public int ProviderMajorVersion + { + get { return this.providerMajorVersion; } + } + + public int ProviderMinorVersion + { + get { return this.providerMinorVersion; } + } + + public string ProviderVersion + { + get { return this.providerVersion; } + } + } +} Modified: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/DefaultMessageConverter.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/DefaultMessageConverter.cs?rev=1760211&r1=1760210&r2=1760211&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/DefaultMessageConverter.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/DefaultMessageConverter.cs Sat Sep 10 23:09:20 2016 @@ -118,16 +118,16 @@ namespace Apache.NMS.MSMQ // Populate property data foreach(object keyObject in message.Properties.Keys) { - string key = (keyObject as string); - object val = message.Properties.GetString(key); - if(!SetLabelAsNMSType && string.Compare(key, "Label", true) == 0 && val != null) - { - msmqMessage.Label = val.ToString(); - } - else - { - propertyData[key] = val; - } + string key = (keyObject as string); + object val = message.Properties[key]; + if(!SetLabelAsNMSType && string.Compare(key, "Label", true) == 0 && val != null) + { + msmqMessage.Label = val.ToString(); + } + else + { + propertyData[key] = val; + } } // Store the NMS property data in the extension area @@ -418,6 +418,8 @@ namespace Apache.NMS.MSMQ result = baseMessage; } + result.ReadOnlyBody = true; + return result; } @@ -442,6 +444,7 @@ namespace Apache.NMS.MSMQ else if(message is BytesMessage) { BytesMessage bytesMessage = message as BytesMessage; + bytesMessage.Reset(); answer.BodyStream.Write(bytesMessage.Content, 0, bytesMessage.Content.Length); answer.AppSpecific = (int) NMSMessageType.BytesMessage; } @@ -604,12 +607,14 @@ namespace Apache.NMS.MSMQ /// <result>MSMQ queue.</result> public MessageQueue ToMsmqDestination(IDestination destination) { - if(null == destination) + Queue queue = destination as Queue; + + if(destination == null) { return null; } - return new MessageQueue((destination as Destination).Path); + return queue.MSMQMessageQueue; } /// <summary> @@ -625,7 +630,7 @@ namespace Apache.NMS.MSMQ return null; } - return new Queue(destinationQueue.Path); + return new Queue(destinationQueue); } #endregion Modified: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/IMessageConverterEx.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/IMessageConverterEx.cs?rev=1760211&r1=1760210&r2=1760211&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/IMessageConverterEx.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/IMessageConverterEx.cs Sat Sep 10 23:09:20 2016 @@ -1,44 +1,44 @@ -using System.Messaging; -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.NMS.MSMQ -{ - /// <summary> - /// Extended IMessageConverter interface supporting new methods for - /// optimizing message selection through "selectors". - /// The original IMessageConverter is maintained for compatibility - /// reasons with existing clients implementing it. - /// </summary> - public interface IMessageConverterEx : IMessageConverter - { - /// <summary> - /// Converts the specified MSMQ message to an equivalent NMS message. - /// </summary> - /// <param name="message">MSMQ message to be converted.</param> - /// <param name="convertBody">true if message body should be converted.</param> - /// <result>Converted NMS message.</result> - IMessage ToNmsMessage(Message message, bool convertBody); - - /// <summary> - /// Converts an MSMQ message body to the equivalent NMS message body. - /// </summary> - /// <param name="message">Source MSMQ message.</param> - /// <param name="answer">Target NMS message.</param> - void ConvertMessageBodyToNMS(Message message, IMessage answer); - } -} +using System.Messaging; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.NMS.MSMQ +{ + /// <summary> + /// Extended IMessageConverter interface supporting new methods for + /// optimizing message selection through "selectors". + /// The original IMessageConverter is maintained for compatibility + /// reasons with existing clients implementing it. + /// </summary> + public interface IMessageConverterEx : IMessageConverter + { + /// <summary> + /// Converts the specified MSMQ message to an equivalent NMS message. + /// </summary> + /// <param name="message">MSMQ message to be converted.</param> + /// <param name="convertBody">true if message body should be converted.</param> + /// <result>Converted NMS message.</result> + IMessage ToNmsMessage(Message message, bool convertBody); + + /// <summary> + /// Converts an MSMQ message body to the equivalent NMS message body. + /// </summary> + /// <param name="message">Source MSMQ message.</param> + /// <param name="answer">Target NMS message.</param> + void ConvertMessageBodyToNMS(Message message, IMessage answer); + } +} Modified: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageConsumer.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1760211&r1=1760210&r2=1760211&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageConsumer.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageConsumer.cs Sat Sep 10 23:09:20 2016 @@ -32,9 +32,6 @@ namespace Apache.NMS.MSMQ private readonly Session session; private readonly AcknowledgementMode acknowledgementMode; private MessageQueue messageQueue; - private Thread asyncDeliveryThread = null; - private AutoResetEvent pause = new AutoResetEvent(false); - private Atomic<bool> asyncDelivery = new Atomic<bool>(false); private ConsumerTransformerDelegate consumerTransformer; public ConsumerTransformerDelegate ConsumerTransformer @@ -81,6 +78,8 @@ namespace Apache.NMS.MSMQ messageQueue, session.MessageConverter, selector); } + #region Asynchronous delivery + private int listenerCount = 0; private event MessageListener listener; public event MessageListener Listener @@ -89,7 +88,13 @@ namespace Apache.NMS.MSMQ { listener += value; listenerCount++; - StartAsyncDelivery(); + + session.Connection.ConnectionStateChange += OnConnectionStateChange; + + if(session.Connection.IsStarted) + { + StartAsyncDelivery(); + } } remove @@ -100,87 +105,37 @@ namespace Apache.NMS.MSMQ listenerCount--; } - if(0 == listenerCount) + if(listenerCount == 0) { + session.Connection.ConnectionStateChange -= OnConnectionStateChange; + StopAsyncDelivery(); } } } - public IMessage Receive() - { - IMessage nmsMessage = null; - - if(messageQueue != null) - { - nmsMessage = reader.Receive(); - nmsMessage = TransformMessage(nmsMessage); - } - - return nmsMessage; - } - - public IMessage Receive(TimeSpan timeout) - { - IMessage nmsMessage = null; - - if(messageQueue != null) - { - nmsMessage = reader.Receive(timeout); - nmsMessage = TransformMessage(nmsMessage); - } - - return nmsMessage; - } - - public IMessage ReceiveNoWait() - { - IMessage nmsMessage = null; - - if(messageQueue != null) - { - nmsMessage = reader.Receive(zeroTimeout); - nmsMessage = TransformMessage(nmsMessage); - } - - return nmsMessage; - } - - public void Dispose() + private void OnConnectionStateChange(object sender, Connection.StateChangeEventArgs e) { - Close(); - } - - public void Close() - { - StopAsyncDelivery(); - if(messageQueue != null) + if(e.CurrentState == Connection.ConnectionState.Starting) { - messageQueue.Dispose(); - messageQueue = null; + if(listenerCount > 0) + { + StartAsyncDelivery(); + } } - } - - protected virtual void StopAsyncDelivery() - { - if(asyncDelivery.CompareAndSet(true, false)) + else if(e.CurrentState == Connection.ConnectionState.Stopping) { - if(null != asyncDeliveryThread) + if(listenerCount > 0) { - Tracer.Info("Stopping async delivery thread."); - pause.Set(); - if(!asyncDeliveryThread.Join(10000)) - { - Tracer.Info("Aborting async delivery thread."); - asyncDeliveryThread.Abort(); - } - - asyncDeliveryThread = null; - Tracer.Info("Async delivery thread stopped."); + StopAsyncDelivery(); } } } + private Thread asyncDeliveryThread = null; + private Atomic<bool> asyncDelivery = new Atomic<bool>(false); + TimeSpan dispatchingTimeout = new TimeSpan(5000); + protected virtual void StartAsyncDelivery() { if(asyncDelivery.CompareAndSet(false, true)) @@ -199,7 +154,7 @@ namespace Apache.NMS.MSMQ { try { - IMessage message = Receive(); + IMessage message = Receive(dispatchingTimeout); if(asyncDelivery.Value && message != null) { try @@ -215,6 +170,7 @@ namespace Apache.NMS.MSMQ catch(ThreadAbortException ex) { Tracer.InfoFormat("Thread abort received in thread: {0} : {1}", this, ex.Message); + break; } catch(Exception ex) @@ -225,11 +181,104 @@ namespace Apache.NMS.MSMQ Tracer.Info("Stopping dispatcher thread consumer: " + this); } + protected virtual void StopAsyncDelivery() + { + if(asyncDelivery.CompareAndSet(true, false)) + { + if(null != asyncDeliveryThread) + { + // Thread.Interrupt and Thread.Abort do not interrupt Receive + // instructions. Attempting to abort the thread and joining + // will result in a phantom backgroud thread, which may + // ultimately consume a message before actually stopping. + + Tracer.Info("Waiting for thread to complete aborting."); + asyncDeliveryThread.Join(dispatchingTimeout); + + asyncDeliveryThread = null; + Tracer.Info("Async delivery thread stopped."); + } + } + } + + protected virtual void HandleAsyncException(Exception e) { session.Connection.HandleException(e); } + #endregion + + #region Receive (synchronous) + + public IMessage Receive() + { + IMessage nmsMessage = null; + + if(messageQueue != null) + { + nmsMessage = reader.Receive(); + nmsMessage = TransformMessage(nmsMessage); + } + + return nmsMessage; + } + + public IMessage Receive(TimeSpan timeout) + { + IMessage nmsMessage = null; + + if(messageQueue != null) + { + try + { + nmsMessage = reader.Receive(timeout); + } + catch(MessageQueueException ex) + { + if(ex.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout) + { + throw ex; + } + } + nmsMessage = TransformMessage(nmsMessage); + } + + return nmsMessage; + } + + public IMessage ReceiveNoWait() + { + return Receive(zeroTimeout); + } + + #endregion + + #region Close & dispose + + public void Dispose() + { + Close(); + } + + public void Close() + { + if(listenerCount > 0) + { + session.Connection.ConnectionStateChange -= OnConnectionStateChange; + + StopAsyncDelivery(); + } + + if(messageQueue != null) + { + messageQueue.Dispose(); + messageQueue = null; + } + } + + #endregion + protected virtual IMessage TransformMessage(IMessage message) { IMessage transformed = message; Modified: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageProducer.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageProducer.cs?rev=1760211&r1=1760210&r2=1760211&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageProducer.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageProducer.cs Sat Sep 10 23:09:20 2016 @@ -50,41 +50,47 @@ namespace Apache.NMS.MSMQ this.destination = destination; if(destination != null) { - messageQueue = openMessageQueue(destination); + messageQueue = OpenMessageQueue(destination); } } - private MessageQueue openMessageQueue(Destination dest) + private MessageQueue OpenMessageQueue(Destination dest) { - MessageQueue rc = null; - try + Queue queue = dest as Queue; + + MessageQueue mq = queue.MSMQMessageQueue; + + if(mq == null) { - if(!MessageQueue.Exists(dest.Path)) - { - // create the new message queue and make it transactional - rc = MessageQueue.Create(dest.Path, session.Transacted); - this.destination.Path = rc.Path; - } - else + try { - rc = new MessageQueue(dest.Path); - this.destination.Path = rc.Path; - if(!rc.CanWrite) + if(!Queue.Exists(dest.Path)) { - throw new NMSSecurityException("Do not have write access to: " + dest); + // create the new message queue and make it transactional + mq = MessageQueue.Create(dest.Path, session.Transacted); + this.destination = new Queue(mq); + } + else + { + mq = new MessageQueue(dest.Path); + this.destination = new Queue(mq); + if(!mq.CanWrite) + { + throw new NMSSecurityException("Do not have write access to: " + dest); + } } } - } - catch(Exception e) - { - if(rc != null) + catch(Exception e) { - rc.Dispose(); + if(mq != null) + { + mq.Dispose(); + } + + throw new NMSException(e.Message + ": " + dest, e); } - - throw new NMSException(e.Message + ": " + dest, e); } - return rc; + return mq; } public void Send(IMessage message) @@ -111,18 +117,28 @@ namespace Apache.NMS.MSMQ // Locate the MSMQ Queue we will be sending to if(messageQueue != null) { + if(destination == null) + { + throw new InvalidDestinationException("This producer can only be used to send to: " + destination); + } + if(destination.Equals(this.destination)) { mq = messageQueue; } else { - throw new NMSException("This producer can only be used to send to: " + destination); + throw new NotSupportedException("This producer can only be used to send to: " + destination); } } else { - mq = openMessageQueue((Destination) destination); + if(destination == null) + { + throw new NotSupportedException(); + } + + mq = OpenMessageQueue((Destination) destination); } if(this.ProducerTransformer != null) @@ -147,7 +163,7 @@ namespace Apache.NMS.MSMQ // TODO: message.NMSMessageId = } - // Convert the Mesasge into a MSMQ message + // Convert the Message into a MSMQ message Message msg = session.MessageConverter.ToMsmqMessage(message); if(mq.Transactional) @@ -179,6 +195,11 @@ namespace Apache.NMS.MSMQ mq.Send(msg); } + message.NMSMessageId = msg.Id; + if(message.NMSCorrelationID == null) + { + message.NMSCorrelationID = msg.CorrelationId; + } } finally { Modified: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Queue.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Queue.cs?rev=1760211&r1=1760210&r2=1760211&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Queue.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Queue.cs Sat Sep 10 23:09:20 2016 @@ -15,6 +15,7 @@ * limitations under the License. */ using System; +using System.Messaging; namespace Apache.NMS.MSMQ { @@ -24,17 +25,69 @@ namespace Apache.NMS.MSMQ /// </summary> public class Queue : Destination, IQueue { - public Queue() : base() { } + public Queue(MessageQueue messageQueue) + : base() + { + this.messageQueue = messageQueue; + Path = messageQueue.Path; + } + public Queue(String name) : base(name) { + if(string.IsNullOrEmpty(name)) + { + messageQueue = null; + } + else + { + try + { + messageQueue = new MessageQueue(name); + } + catch(Exception /*ex*/) + { + // Excerpt from Microsoft documentation for MessageQueue.Exists : + // (@https://msdn.microsoft.com/fr-fr/library/system.messaging.messagequeue.exists(v=vs.110).aspx) + // Exists(String) is an expensive operation. Use it only when it is necessary within the application. + // --- + // Therefore, we won't check for existence of the queue before attempting to access it. + + //if(!Exists(name)) + //{ + // Excerpt from the Oracle JMS JavaDoc for Session.createQueue : + // (@https://docs.oracle.com/javaee/7/api/javax/jms/Session.html#createQueue-java.lang.String-) + // Note that this method simply creates an object that encapsulates the name of a queue. It does + // not create the physical queue in the JMS provider. JMS does not provide a method to create the + // physical queue, since this would be specific to a given JMS provider. Creating a physical queue + // is provider-specific and is typically an administrative task performed by an administrator, + // though some providers may create them automatically when needed. The one exception to this is + // the creation of a temporary queue, which is done using the createTemporaryQueue method. + // --- + // Therefore, we should throw an NMSException if the queue does not exist. + // --- + // BUT, to keep it compatible with the initial implementation of MessageProducer, which attempts + // to create non pre-existing queues, we keep it silent.. + + // throw new NMSException("Message queue \"" + name + "\" does not exist", ex); + //} + + // throw new NMSException("Cannot access message queue \"" + name + "\"", ex); + } + } } + private MessageQueue messageQueue; + public MessageQueue MSMQMessageQueue + { + get { return messageQueue; } + } + override public DestinationType DestinationType { get @@ -54,7 +107,25 @@ namespace Apache.NMS.MSMQ return new Queue(name); } - + public static bool Exists(string name) + { + try + { + return MessageQueue.Exists(name); + } + catch(InvalidOperationException) + { + // Excerpt from Microsoft documentation for MessageQueue.Exists : + // (@https://msdn.microsoft.com/fr-fr/library/system.messaging.messagequeue.exists(v=vs.110).aspx) + // InvalidOperationException: The application used format name syntax when verifying queue existence. + // --- + // The Exists(String) method does not support the FormatName prefix. + // No method exists to determine whether a queue with a specified format name exists. + + // We'll assume the queue exists at this point + return true; + } + } } } Modified: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/QueueBrowser.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/QueueBrowser.cs?rev=1760211&r1=1760210&r2=1760211&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/QueueBrowser.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/QueueBrowser.cs Sat Sep 10 23:09:20 2016 @@ -1,164 +1,172 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -using System; -using System.Collections; -using System.Messaging; -using Apache.NMS; -using Apache.NMS.Util; -using Apache.NMS.MSMQ.Readers; - -namespace Apache.NMS.MSMQ -{ - public class QueueBrowser : Apache.NMS.IQueueBrowser - { - private bool closed = false; - private bool disposed = false; - - private readonly Session session; - private MessageQueue messageQueue; - - private string selector; - - private IMessageReader reader; - - public QueueBrowser(Session session, MessageQueue messageQueue) - : this(session, messageQueue, null) - { - } - - public QueueBrowser(Session session, MessageQueue messageQueue, - string selector) - { - this.session = session; - this.messageQueue = messageQueue; - if(null != this.messageQueue) - { - this.messageQueue.MessageReadPropertyFilter.SetAll(); - } - - reader = MessageReaderUtil.CreateMessageReader( - messageQueue, session.MessageConverter, selector); - } - - ~QueueBrowser() - { - Dispose(false); - } - - #region IDisposable Members - - ///<summary> - /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. - ///</summary> - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - protected void Dispose(bool disposing) - { - if(disposed) - { - return; - } - - if(disposing) - { - // Dispose managed code here. - } - - try - { - Close(); - } - catch - { - // Ignore errors. - } - - disposed = true; - } - - #endregion - - public void Close() - { - if(messageQueue != null) - { - messageQueue.Dispose(); - messageQueue = null; - } - closed = true; - } - - public string MessageSelector - { - get { return selector; } - } - - public IQueue Queue - { - get { return new Queue(this.messageQueue.Path); } - } - - internal class Enumerator : IEnumerator - { - private readonly Session session; - private readonly MessageEnumerator innerEnumerator; - private readonly IMessageReader reader; - - public Enumerator(Session session, MessageQueue messageQueue, - IMessageReader reader) - { - this.session = session; - this.innerEnumerator = messageQueue.GetMessageEnumerator2(); - this.reader = reader; - } - - public object Current - { - get - { - return this.session.MessageConverter.ToNmsMessage(this.innerEnumerator.Current); - } - } - - public bool MoveNext() - { - while(this.innerEnumerator.MoveNext()) - { - if(reader.Matches(this.innerEnumerator.Current)) - { - return true; - } - } - return false; - } - - public void Reset() - { - this.innerEnumerator.Reset(); - } - } - - public IEnumerator GetEnumerator() - { - return new Enumerator(this.session, this.messageQueue, this.reader); - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using System; +using System.Collections; +using System.Messaging; +using Apache.NMS; +using Apache.NMS.Util; +using Apache.NMS.MSMQ.Readers; + +namespace Apache.NMS.MSMQ +{ + public class QueueBrowser : Apache.NMS.IQueueBrowser + { + private bool disposed = false; + + private readonly Session session; + private MessageQueue messageQueue; + + private string selector; + + private IMessageReader reader; + + public QueueBrowser(Session session, MessageQueue messageQueue) + : this(session, messageQueue, null) + { + } + + public QueueBrowser(Session session, MessageQueue messageQueue, + string selector) + { + this.session = session; + this.messageQueue = messageQueue; + if(null != this.messageQueue) + { + this.messageQueue.MessageReadPropertyFilter.SetAll(); + } + this.selector = selector; + + reader = MessageReaderUtil.CreateMessageReader( + messageQueue, session.MessageConverter, selector); + } + + ~QueueBrowser() + { + Dispose(false); + } + + #region IDisposable Members + + ///<summary> + /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. + ///</summary> + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected void Dispose(bool disposing) + { + if(disposed) + { + return; + } + + if(disposing) + { + // Dispose managed code here. + } + + try + { + Close(); + } + catch + { + // Ignore errors. + } + + disposed = true; + } + + #endregion + + public void Close() + { + if(messageQueue != null) + { + messageQueue.Dispose(); + messageQueue = null; + } + } + + public string MessageSelector + { + get { return selector; } + } + + public IQueue Queue + { + get { return new Queue(this.messageQueue.Path); } + } + + internal class Enumerator : IEnumerator, IDisposable + { + private Session session; + private MessageEnumerator innerEnumerator; + private IMessageReader reader; + + public Enumerator(Session session, MessageQueue messageQueue, + IMessageReader reader) + { + this.session = session; + this.innerEnumerator = messageQueue.GetMessageEnumerator2(); + this.reader = reader; + } + + public object Current + { + get + { + return this.session.MessageConverter.ToNmsMessage(this.innerEnumerator.Current); + } + } + + public bool MoveNext() + { + while(this.innerEnumerator.MoveNext()) + { + if(reader.Matches(this.innerEnumerator.Current)) + { + return true; + } + } + return false; + } + + public void Reset() + { + this.innerEnumerator.Reset(); + } + + public void Dispose() + { + if(innerEnumerator != null) + { + innerEnumerator.Close(); + innerEnumerator = null; + } + } + } + + public IEnumerator GetEnumerator() + { + return new Enumerator(this.session, this.messageQueue, this.reader); + } + } +} Modified: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/AbstractMessageReader.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/AbstractMessageReader.cs?rev=1760211&r1=1760210&r2=1760211&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/AbstractMessageReader.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/AbstractMessageReader.cs Sat Sep 10 23:09:20 2016 @@ -1,126 +1,126 @@ -using System; -using System.Messaging; -using Apache.NMS.MSMQ; -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.NMS.MSMQ.Readers -{ - /// <summary> - /// Abstract MSMQ message reader. Derived classes support various - /// message filtering methods. - /// </summary> - public abstract class AbstractMessageReader : IMessageReader - { - protected MessageQueue messageQueue; - protected IMessageConverter messageConverter; - protected IMessageConverterEx messageConverterEx; - - /// <summary> - /// Constructor. - /// </summary> - /// <param name="messageQueue">The MSMQ message queue from which - /// messages will be read.</param> - /// <param name="messageConverter">A message converter for mapping - /// MSMQ messages to NMS messages.</param> - public AbstractMessageReader(MessageQueue messageQueue, - IMessageConverter messageConverter) - { - this.messageQueue = messageQueue; - - this.messageConverter = messageConverter; - this.messageConverterEx = (messageConverter as IMessageConverterEx); - } - - /// <summary> - /// Returns without removing (peeks) the first message in the queue - /// referenced by this MessageQueue matching the selection criteria. - /// The Peek method is synchronous, so it blocks the current thread - /// until a message becomes available. - /// </summary> - /// <returns>Peeked message.</returns> - public abstract IMessage Peek(); - - /// <summary> - /// Returns without removing (peeks) the first message in the queue - /// referenced by this MessageQueue matching the selection criteria. - /// The Peek method is synchronous, so it blocks the current thread - /// until a message becomes available or the specified time-out occurs. - /// </summary> - /// <param name="timeSpan">Reception time-out.</param> - /// <returns>Peeked message.</returns> - public abstract IMessage Peek(TimeSpan timeSpan); - - /// <summary> - /// Receives the first message available in the queue referenced by - /// the MessageQueue matching the selection criteria. - /// This call is synchronous, and blocks the current thread of execution - /// until a message is available. - /// </summary> - /// <returns>Received message.</returns> - public abstract IMessage Receive(); - - /// <summary> - /// Receives the first message available in the queue referenced by the - /// MessageQueue matching the selection criteria, and waits until either - /// a message is available in the queue, or the time-out expires. - /// </summary> - /// <param name="timeSpan">Reception time-out.</param> - /// <returns>Received message.</returns> - public abstract IMessage Receive(TimeSpan timeSpan); - - /// <summary> - /// Receives the first message available in the transactional queue - /// referenced by the MessageQueue matching the selection criteria. - /// This call is synchronous, and blocks the current thread of execution - /// until a message is available. - /// </summary> - /// <param name="transaction">Transaction.</param> - /// <returns>Received message.</returns> - public abstract IMessage Receive(MessageQueueTransaction transaction); - - /// <summary> - /// Receives the first message available in the transactional queue - /// referenced by the MessageQueue matching the selection criteria, - /// and waits until either a message is available in the queue, or the - /// time-out expires. - /// </summary> - /// <param name="timeSpan">Reception time-out.</param> - /// <param name="transaction">Transaction.</param> - /// <returns>Received message.</returns> - public abstract IMessage Receive(TimeSpan timeSpan, - MessageQueueTransaction transaction); - - /// <summary> - /// Checks if an MSMQ message matches the selection criteria. - /// </summary> - /// <param name="message">MSMQ message.</param> - /// <return>true if the message matches the selection criteria.</return> - public abstract bool Matches(Message message); - - /// <summary> - /// Converts an MSMQ message to an NMS message, using the converter - /// specified at construction time. - /// </summary> - /// <param name="message">MSMQ message.</param> - /// <return>NMS message.</return> - protected IMessage Convert(Message message) - { - return message == null ? null : messageConverter.ToNmsMessage(message); - } - } -} +using System; +using System.Messaging; +using Apache.NMS.MSMQ; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.NMS.MSMQ.Readers +{ + /// <summary> + /// Abstract MSMQ message reader. Derived classes support various + /// message filtering methods. + /// </summary> + public abstract class AbstractMessageReader : IMessageReader + { + protected MessageQueue messageQueue; + protected IMessageConverter messageConverter; + protected IMessageConverterEx messageConverterEx; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="messageQueue">The MSMQ message queue from which + /// messages will be read.</param> + /// <param name="messageConverter">A message converter for mapping + /// MSMQ messages to NMS messages.</param> + public AbstractMessageReader(MessageQueue messageQueue, + IMessageConverter messageConverter) + { + this.messageQueue = messageQueue; + + this.messageConverter = messageConverter; + this.messageConverterEx = (messageConverter as IMessageConverterEx); + } + + /// <summary> + /// Returns without removing (peeks) the first message in the queue + /// referenced by this MessageQueue matching the selection criteria. + /// The Peek method is synchronous, so it blocks the current thread + /// until a message becomes available. + /// </summary> + /// <returns>Peeked message.</returns> + public abstract IMessage Peek(); + + /// <summary> + /// Returns without removing (peeks) the first message in the queue + /// referenced by this MessageQueue matching the selection criteria. + /// The Peek method is synchronous, so it blocks the current thread + /// until a message becomes available or the specified time-out occurs. + /// </summary> + /// <param name="timeSpan">Reception time-out.</param> + /// <returns>Peeked message.</returns> + public abstract IMessage Peek(TimeSpan timeSpan); + + /// <summary> + /// Receives the first message available in the queue referenced by + /// the MessageQueue matching the selection criteria. + /// This call is synchronous, and blocks the current thread of execution + /// until a message is available. + /// </summary> + /// <returns>Received message.</returns> + public abstract IMessage Receive(); + + /// <summary> + /// Receives the first message available in the queue referenced by the + /// MessageQueue matching the selection criteria, and waits until either + /// a message is available in the queue, or the time-out expires. + /// </summary> + /// <param name="timeSpan">Reception time-out.</param> + /// <returns>Received message.</returns> + public abstract IMessage Receive(TimeSpan timeSpan); + + /// <summary> + /// Receives the first message available in the transactional queue + /// referenced by the MessageQueue matching the selection criteria. + /// This call is synchronous, and blocks the current thread of execution + /// until a message is available. + /// </summary> + /// <param name="transaction">Transaction.</param> + /// <returns>Received message.</returns> + public abstract IMessage Receive(MessageQueueTransaction transaction); + + /// <summary> + /// Receives the first message available in the transactional queue + /// referenced by the MessageQueue matching the selection criteria, + /// and waits until either a message is available in the queue, or the + /// time-out expires. + /// </summary> + /// <param name="timeSpan">Reception time-out.</param> + /// <param name="transaction">Transaction.</param> + /// <returns>Received message.</returns> + public abstract IMessage Receive(TimeSpan timeSpan, + MessageQueueTransaction transaction); + + /// <summary> + /// Checks if an MSMQ message matches the selection criteria. + /// </summary> + /// <param name="message">MSMQ message.</param> + /// <return>true if the message matches the selection criteria.</return> + public abstract bool Matches(Message message); + + /// <summary> + /// Converts an MSMQ message to an NMS message, using the converter + /// specified at construction time. + /// </summary> + /// <param name="message">MSMQ message.</param> + /// <return>NMS message.</return> + protected IMessage Convert(Message message) + { + return message == null ? null : messageConverter.ToNmsMessage(message); + } + } +}