http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/nmsprovider-test.config ---------------------------------------------------------------------- diff --git a/tags/1.2.0-RC1/nmsprovider-test.config b/tags/1.2.0-RC1/nmsprovider-test.config new file mode 100644 index 0000000..051f872 --- /dev/null +++ b/tags/1.2.0-RC1/nmsprovider-test.config @@ -0,0 +1,26 @@ +<?xml version="1.0" encoding="utf-8" ?> +<!-- +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +--> +<configuration> + <defaultURI value="msmq://localhost"> + <factoryParams> + <param type="string" value="NMSTestClient"/> + </factoryParams> + <userName value="guest"/> + <passWord value="guest"/> + </defaultURI> +</configuration>
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/package.ps1 ---------------------------------------------------------------------- diff --git a/tags/1.2.0-RC1/package.ps1 b/tags/1.2.0-RC1/package.ps1 new file mode 100644 index 0000000..e27a321 --- /dev/null +++ b/tags/1.2.0-RC1/package.ps1 @@ -0,0 +1,67 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +$pkgname = "Apache.NMS.MSMQ" +$pkgver = "1.2.0" +$configurations = "release", "debug" +$frameworks = "net-2.0", "net-3.5" + +write-progress "Creating package directory." "Initializing..." +if(!(test-path package)) +{ + md package +} + +if(test-path build) +{ + pushd build + + $pkgdir = "..\package" + + write-progress "Packaging Application files." "Scanning..." + $zipfile = "$pkgdir\$pkgname-$pkgver-bin.zip" + zip -9 -u -j "$zipfile" ..\LICENSE.txt + zip -9 -u -j "$zipfile" ..\NOTICE.txt + foreach($configuration in $configurations) + { + foreach($framework in $frameworks) + { + zip -9 -u "$zipfile" "$framework\$configuration\$pkgname.dll" + zip -9 -u "$zipfile" "$framework\$configuration\nmsprovider*.config" + zip -9 -u "$zipfile" "$framework\$configuration\$pkgname.Test.dll" + if($framework -ieq "mono-2.0") + { + zip -9 -u "$zipfile" "$framework\$configuration\$pkgname.dll.mdb" + zip -9 -u "$zipfile" "$framework\$configuration\$pkgname.Test.dll.mdb" + } + else + { + zip -9 -u "$zipfile" "$framework\$configuration\$pkgname.pdb" + zip -9 -u "$zipfile" "$framework\$configuration\$pkgname.Test.pdb" + } + } + } + + popd +} + +write-progress "Packaging Source code files." "Scanning..." +$pkgdir = "package" +$zipfile = "$pkgdir\$pkgname-$pkgver-src.zip" + +zip -9 -u "$zipfile" LICENSE.txt NOTICE.txt nant-common.xml nant.build package.ps1 vs2008-msmq-test.csproj vs2008-msmq.csproj vs2008-msmq.sln +zip -9 -u -r "$zipfile" keyfile src + +write-progress -Completed "Packaging" "Complete." http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/BaseMessage.cs ---------------------------------------------------------------------- diff --git a/tags/1.2.0-RC1/src/main/csharp/BaseMessage.cs b/tags/1.2.0-RC1/src/main/csharp/BaseMessage.cs new file mode 100644 index 0000000..16d7164 --- /dev/null +++ b/tags/1.2.0-RC1/src/main/csharp/BaseMessage.cs @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using Apache.NMS.Util; + +namespace Apache.NMS.MSMQ +{ + public delegate void AcknowledgeHandler(BaseMessage baseMessage); + + public class BaseMessage : IMessage + { + private PrimitiveMap propertiesMap = new PrimitiveMap(); + private IDestination destination; + private string correlationId; + private TimeSpan timeToLive; + private string messageId; + private MsgDeliveryMode deliveryMode; + private MsgPriority priority; + private Destination replyTo; + private byte[] content; + private string type; + private event AcknowledgeHandler Acknowledger; + private DateTime timestamp = new DateTime(); + private bool readOnlyMsgBody = false; + + public bool ReadOnlyBody + { + get { return readOnlyMsgBody; } + set { readOnlyMsgBody = value; } + } + + // IMessage interface + + public void Acknowledge() + { + if(null != Acknowledger) + { + Acknowledger(this); + } + } + + /// <summary> + /// Clears out the message body. Clearing a message's body does not clear its header + /// values or property entries. + /// + /// If this message body was read-only, calling this method leaves the message body in + /// the same state as an empty body in a newly created message. + /// </summary> + public virtual void ClearBody() + { + this.Content = null; + this.readOnlyMsgBody = false; + } + + /// <summary> + /// Clears a message's properties. + /// + /// The message's header fields and body are not cleared. + /// </summary> + public virtual void ClearProperties() + { + propertiesMap.Clear(); + } + + // Properties + + public IPrimitiveMap Properties + { + get { return propertiesMap; } + } + + + // NMS headers + + /// <summary> + /// The correlation ID used to correlate messages with conversations or long running business processes + /// </summary> + public string NMSCorrelationID + { + get { return correlationId; } + set { correlationId = value; } + } + + /// <summary> + /// The destination of the message + /// </summary> + public IDestination NMSDestination + { + get { return destination; } + set { destination = value; } + } + + /// <summary> + /// The time in milliseconds that this message should expire in + /// </summary> + public TimeSpan NMSTimeToLive + { + get { return timeToLive; } + set { timeToLive = value; } + } + + /// <summary> + /// The message ID which is set by the provider + /// </summary> + public string NMSMessageId + { + get { return messageId; } + set { messageId = value; } + } + + /// <summary> + /// Whether or not this message is persistent + /// </summary> + public MsgDeliveryMode NMSDeliveryMode + { + get { return deliveryMode; } + set { deliveryMode = value; } + } + + /// <summary> + /// The Priority on this message + /// </summary> + public MsgPriority NMSPriority + { + get { return priority; } + set { priority = value; } + } + + /// <summary> + /// Returns true if this message has been redelivered to this or another consumer before being acknowledged successfully. + /// </summary> + public bool NMSRedelivered + { + get { return false; } + } + + + /// <summary> + /// The destination that the consumer of this message should send replies to + /// </summary> + public IDestination NMSReplyTo + { + get { return replyTo; } + set { replyTo = (Destination) value; } + } + + + /// <summary> + /// The timestamp the broker added to the message + /// </summary> + public DateTime NMSTimestamp + { + get { return timestamp; } + set { timestamp = value; } + } + + public byte[] Content + { + get { return content; } + set { this.content = value; } + } + + /// <summary> + /// The type name of this message + /// </summary> + public string NMSType + { + get { return type; } + set { type = value; } + } + + + public object GetObjectProperty(string name) + { + return null; + } + + public void SetObjectProperty(string name, object value) + { + } + + protected void FailIfReadOnlyBody() + { + if(ReadOnlyBody == true) + { + throw new MessageNotWriteableException("Message is in Read-Only mode."); + } + } + + protected void FailIfWriteOnlyBody() + { + if( ReadOnlyBody == false ) + { + throw new MessageNotReadableException("Message is in Write-Only mode."); + } + } + } +} + http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/BytesMessage.cs ---------------------------------------------------------------------- diff --git a/tags/1.2.0-RC1/src/main/csharp/BytesMessage.cs b/tags/1.2.0-RC1/src/main/csharp/BytesMessage.cs new file mode 100644 index 0000000..51ca6fa --- /dev/null +++ b/tags/1.2.0-RC1/src/main/csharp/BytesMessage.cs @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +using Apache.NMS.Util; +using System.IO; +using System; +namespace Apache.NMS.MSMQ +{ + /// <summary> + /// + /// A BytesMessage object is used to send a message containing a stream of uninterpreted + /// bytes. It inherits from the Message interface and adds a bytes message body. The + /// receiver of the message supplies the interpretation of the bytes. + /// + /// This message type is for client encoding of existing message formats. If possible, + /// one of the other self-defining message types should be used instead. + /// + /// Although the NMS API allows the use of message properties with byte messages, they + /// are typically not used, since the inclusion of properties may affect the format. + /// + /// When the message is first created, and when ClearBody is called, the body of the + /// message is in write-only mode. After the first call to Reset has been made, the + /// message body is in read-only mode. After a message has been sent, the client that + /// sent it can retain and modify it without affecting the message that has been sent. + /// The same message object can be sent multiple times. When a message has been received, + /// the provider has called Reset so that the message body is in read-only mode for the + /// client. + /// + /// If ClearBody is called on a message in read-only mode, the message body is cleared and + /// the message is in write-only mode. + /// + /// If a client attempts to read a message in write-only mode, a MessageNotReadableException + /// is thrown. + /// + /// If a client attempts to write a message in read-only mode, a MessageNotWriteableException + /// is thrown. + /// </summary> + public class BytesMessage : BaseMessage, IBytesMessage + { + private EndianBinaryReader dataIn = null; + private EndianBinaryWriter dataOut = null; + private MemoryStream outputBuffer = null; + + // Need this later when we add compression to store true content length. + private long length = 0; + + public override void ClearBody() + { + base.ClearBody(); + this.outputBuffer = null; + this.dataIn = null; + this.dataOut = null; + this.length = 0; + } + + public long BodyLength + { + get + { + InitializeReading(); + return this.length; + } + } + + public byte ReadByte() + { + InitializeReading(); + try + { + return dataIn.ReadByte(); + } + catch(EndOfStreamException e) + { + throw NMSExceptionSupport.CreateMessageEOFException(e); + } + catch(IOException e) + { + throw NMSExceptionSupport.CreateMessageFormatException(e); + } + } + + public void WriteByte( byte value ) + { + InitializeWriting(); + try + { + dataOut.Write( value ); + } + catch(Exception e) + { + throw NMSExceptionSupport.Create(e); + } + } + + public bool ReadBoolean() + { + InitializeReading(); + try + { + return dataIn.ReadBoolean(); + } + catch(EndOfStreamException e) + { + throw NMSExceptionSupport.CreateMessageEOFException(e); + } + catch(IOException e) + { + throw NMSExceptionSupport.CreateMessageFormatException(e); + } + } + + public void WriteBoolean( bool value ) + { + InitializeWriting(); + try + { + dataOut.Write( value ); + } + catch(Exception e) + { + throw NMSExceptionSupport.Create(e); + } + } + + public char ReadChar() + { + InitializeReading(); + try + { + return dataIn.ReadChar(); + } + catch(EndOfStreamException e) + { + throw NMSExceptionSupport.CreateMessageEOFException(e); + } + catch(IOException e) + { + throw NMSExceptionSupport.CreateMessageFormatException(e); + } + } + + public void WriteChar( char value ) + { + InitializeWriting(); + try + { + dataOut.Write( value ); + } + catch(Exception e) + { + throw NMSExceptionSupport.Create(e); + } + } + + public short ReadInt16() + { + InitializeReading(); + try + { + return dataIn.ReadInt16(); + } + catch(EndOfStreamException e) + { + throw NMSExceptionSupport.CreateMessageEOFException(e); + } + catch(IOException e) + { + throw NMSExceptionSupport.CreateMessageFormatException(e); + } + } + + public void WriteInt16( short value ) + { + InitializeWriting(); + try + { + dataOut.Write( value ); + } + catch(Exception e) + { + throw NMSExceptionSupport.Create(e); + } + } + + public int ReadInt32() + { + InitializeReading(); + try + { + return dataIn.ReadInt32(); + } + catch(EndOfStreamException e) + { + throw NMSExceptionSupport.CreateMessageEOFException(e); + } + catch(IOException e) + { + throw NMSExceptionSupport.CreateMessageFormatException(e); + } + } + + public void WriteInt32( int value ) + { + InitializeWriting(); + try + { + dataOut.Write( value ); + } + catch(Exception e) + { + throw NMSExceptionSupport.Create(e); + } + } + + public long ReadInt64() + { + InitializeReading(); + try + { + return dataIn.ReadInt64(); + } + catch(EndOfStreamException e) + { + throw NMSExceptionSupport.CreateMessageEOFException(e); + } + catch(IOException e) + { + throw NMSExceptionSupport.CreateMessageFormatException(e); + } + } + + public void WriteInt64( long value ) + { + InitializeWriting(); + try + { + dataOut.Write( value ); + } + catch(Exception e) + { + throw NMSExceptionSupport.Create(e); + } + } + + public float ReadSingle() + { + InitializeReading(); + try + { + return dataIn.ReadSingle(); + } + catch(EndOfStreamException e) + { + throw NMSExceptionSupport.CreateMessageEOFException(e); + } + catch(IOException e) + { + throw NMSExceptionSupport.CreateMessageFormatException(e); + } + } + + public void WriteSingle( float value ) + { + InitializeWriting(); + try + { + dataOut.Write( value ); + } + catch(Exception e) + { + throw NMSExceptionSupport.Create(e); + } + } + + public double ReadDouble() + { + InitializeReading(); + try + { + return dataIn.ReadDouble(); + } + catch(EndOfStreamException e) + { + throw NMSExceptionSupport.CreateMessageEOFException(e); + } + catch(IOException e) + { + throw NMSExceptionSupport.CreateMessageFormatException(e); + } + } + + public void WriteDouble( double value ) + { + InitializeWriting(); + try + { + dataOut.Write( value ); + } + catch(Exception e) + { + throw NMSExceptionSupport.Create(e); + } + } + + public int ReadBytes( byte[] value ) + { + InitializeReading(); + try + { + return dataIn.Read( value, 0, value.Length ); + } + catch(EndOfStreamException e) + { + throw NMSExceptionSupport.CreateMessageEOFException(e); + } + catch(IOException e) + { + throw NMSExceptionSupport.CreateMessageFormatException(e); + } + } + + public int ReadBytes( byte[] value, int length ) + { + InitializeReading(); + try + { + return dataIn.Read( value, 0, length ); + } + catch(EndOfStreamException e) + { + throw NMSExceptionSupport.CreateMessageEOFException(e); + } + catch(IOException e) + { + throw NMSExceptionSupport.CreateMessageFormatException(e); + } + } + + public void WriteBytes( byte[] value ) + { + InitializeWriting(); + try + { + dataOut.Write( value, 0, value.Length ); + } + catch(Exception e) + { + throw NMSExceptionSupport.Create(e); + } + } + + public void WriteBytes( byte[] value, int offset, int length ) + { + InitializeWriting(); + try + { + dataOut.Write( value, offset, length ); + } + catch(Exception e) + { + throw NMSExceptionSupport.Create(e); + } + } + + public string ReadString() + { + InitializeReading(); + try + { + // JMS, CMS and NMS all encode the String using a 16 bit size header. + return dataIn.ReadString16(); + } + catch(EndOfStreamException e) + { + throw NMSExceptionSupport.CreateMessageEOFException(e); + } + catch(IOException e) + { + throw NMSExceptionSupport.CreateMessageFormatException(e); + } + } + + public void WriteString( string value ) + { + InitializeWriting(); + try + { + // JMS, CMS and NMS all encode the String using a 16 bit size header. + dataOut.WriteString16(value); + } + catch(Exception e) + { + throw NMSExceptionSupport.Create(e); + } + } + + public void WriteObject( System.Object value ) + { + InitializeWriting(); + if( value is System.Byte ) + { + this.dataOut.Write( (byte) value ); + } + else if( value is Char ) + { + this.dataOut.Write( (char) value ); + } + else if( value is Boolean ) + { + this.dataOut.Write( (bool) value ); + } + else if( value is Int16 ) + { + this.dataOut.Write( (short) value ); + } + else if( value is Int32 ) + { + this.dataOut.Write( (int) value ); + } + else if( value is Int64 ) + { + this.dataOut.Write( (long) value ); + } + else if( value is Single ) + { + this.dataOut.Write( (float) value ); + } + else if( value is Double ) + { + this.dataOut.Write( (double) value ); + } + else if( value is byte[] ) + { + this.dataOut.Write( (byte[]) value ); + } + else if( value is String ) + { + this.dataOut.WriteString16( (string) value ); + } + else + { + throw new MessageFormatException("Cannot write non-primitive type:" + value.GetType()); + } + } + + public void Reset() + { + StoreContent(); + this.dataIn = null; + this.dataOut = null; + this.outputBuffer = null; + this.ReadOnlyBody = true; + } + + private void InitializeReading() + { + FailIfWriteOnlyBody(); + if(this.dataIn == null) + { + if(this.Content != null) + { + this.length = this.Content.Length; + } + + // TODO - Add support for Message Compression. + MemoryStream bytesIn = new MemoryStream(this.Content, false); + dataIn = new EndianBinaryReader(bytesIn); + } + } + + private void InitializeWriting() + { + FailIfReadOnlyBody(); + if(this.dataOut == null) + { + // TODO - Add support for Message Compression. + this.outputBuffer = new MemoryStream(); + this.dataOut = new EndianBinaryWriter(outputBuffer); + } + } + + private void StoreContent() + { + if( dataOut != null) + { + dataOut.Close(); + // TODO - Add support for Message Compression. + + this.Content = outputBuffer.ToArray(); + this.dataOut = null; + this.outputBuffer = null; + } + } + } +} + http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/Connection.cs ---------------------------------------------------------------------- diff --git a/tags/1.2.0-RC1/src/main/csharp/Connection.cs b/tags/1.2.0-RC1/src/main/csharp/Connection.cs new file mode 100644 index 0000000..096e41f --- /dev/null +++ b/tags/1.2.0-RC1/src/main/csharp/Connection.cs @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; + +namespace Apache.NMS.MSMQ +{ + /// <summary> + /// Represents a NMS connection MSMQ. Since the underlying MSMQ APIs are actually + /// connectionless, NMS connection in the MSMQ case are not expensive operations. + /// </summary> + /// + public class Connection : IConnection + { + private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge; + private IMessageConverter messageConverter = new DefaultMessageConverter(); + + private IRedeliveryPolicy redeliveryPolicy; + private ConnectionMetaData metaData = null; + private bool connected; + private bool closed; + private string clientId; + + /// <summary> + /// Starts message delivery for this connection. + /// </summary> + public void Start() + { + CheckConnected(); + } + + /// <summary> + /// This property determines if the asynchronous message delivery of incoming + /// messages has been started for this connection. + /// </summary> + public bool IsStarted + { + get { return true; } + } + + /// <summary> + /// Stop message delivery for this connection. + /// </summary> + public void Stop() + { + CheckConnected(); + } + + /// <summary> + /// Creates a new session to work on this connection + /// </summary> + public ISession CreateSession() + { + return CreateSession(acknowledgementMode); + } + + /// <summary> + /// Creates a new session to work on this connection + /// </summary> + public ISession CreateSession(AcknowledgementMode mode) + { + CheckConnected(); + return new Session(this, mode); + } + + public void Dispose() + { + closed = true; + } + + /// <summary> + /// The default timeout for network requests. + /// </summary> + public TimeSpan RequestTimeout + { + get { return NMSConstants.defaultRequestTimeout; } + set { } + } + + public AcknowledgementMode AcknowledgementMode + { + get { return acknowledgementMode; } + set { acknowledgementMode = value; } + } + + public IMessageConverter MessageConverter + { + get { return messageConverter; } + set { messageConverter = value; } + } + + public string ClientId + { + get { return clientId; } + set + { + if(connected) + { + throw new NMSException("You cannot change the ClientId once the Connection is connected"); + } + clientId = value; + } + } + + /// <summary> + /// Get/or set the redelivery policy for this connection. + /// </summary> + public IRedeliveryPolicy RedeliveryPolicy + { + get { return this.redeliveryPolicy; } + set { this.redeliveryPolicy = value; } + } + + /// <summary> + /// Gets the Meta Data for the NMS Connection instance. + /// </summary> + public IConnectionMetaData MetaData + { + get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); } + } + + /// <summary> + /// A delegate that can receive transport level exceptions. + /// </summary> + public event ExceptionListener ExceptionListener; + + /// <summary> + /// An asynchronous listener that is notified when a Fault tolerant connection + /// has been interrupted. + /// </summary> + public event ConnectionInterruptedListener ConnectionInterruptedListener; + + /// <summary> + /// An asynchronous listener that is notified when a Fault tolerant connection + /// has been resumed. + /// </summary> + public event ConnectionResumedListener ConnectionResumedListener; + + protected void CheckConnected() + { + if(closed) + { + throw new NMSException("Connection Closed"); + } + if(!connected) + { + connected = true; + // now lets send the connection and see if we get an ack/nak + // TODO: establish a connection + } + } + + public void Close() + { + Dispose(); + } + + public void HandleException(Exception e) + { + if(ExceptionListener != null && !this.closed) + { + ExceptionListener(e); + } + else + { + Tracer.Error(e); + } + } + + public void HandleTransportInterrupted() + { + Tracer.Debug("Transport has been Interrupted."); + + if(this.ConnectionInterruptedListener != null && !this.closed) + { + try + { + this.ConnectionInterruptedListener(); + } + catch + { + } + } + } + + public void HandleTransportResumed() + { + Tracer.Debug("Transport has resumed normal operation."); + + if(this.ConnectionResumedListener != null && !this.closed) + { + try + { + this.ConnectionResumedListener(); + } + catch + { + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/ConnectionFactory.cs ---------------------------------------------------------------------- diff --git a/tags/1.2.0-RC1/src/main/csharp/ConnectionFactory.cs b/tags/1.2.0-RC1/src/main/csharp/ConnectionFactory.cs new file mode 100644 index 0000000..24e895d --- /dev/null +++ b/tags/1.2.0-RC1/src/main/csharp/ConnectionFactory.cs @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using System; +using Apache.NMS.Policies; + +namespace Apache.NMS.MSMQ +{ + /// <summary> + /// A Factory that can estbalish NMS connections to MSMQ + /// </summary> + public class ConnectionFactory : IConnectionFactory + { + public const string DEFAULT_BROKER_URL = "msmq://localhost"; + public const string ENV_BROKER_URL = "MSMQ_BROKER_URL"; + private Uri brokerUri; + private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); + + public static string GetDefaultBrokerUrl() + { + string answer = Environment.GetEnvironmentVariable(ENV_BROKER_URL); + if(answer == null) + { + answer = DEFAULT_BROKER_URL; + } + return answer; + } + + public ConnectionFactory() + : this(GetDefaultBrokerUrl()) + { + } + + public ConnectionFactory(string brokerUri) + : this(brokerUri, null) + { + } + + public ConnectionFactory(string brokerUri, string clientID) + : this(new Uri(brokerUri), clientID) + { + } + + public ConnectionFactory(Uri brokerUri) + : this(brokerUri, null) + { + } + + public ConnectionFactory(Uri brokerUri, string clientID) + { + this.brokerUri = brokerUri; + } + + /// <summary> + /// Creates a new connection to MSMQ. + /// </summary> + public IConnection CreateConnection() + { + return CreateConnection(string.Empty, string.Empty, false); + } + + /// <summary> + /// Creates a new connection to MSMQ. + /// </summary> + public IConnection CreateConnection(string userName, string password) + { + return CreateConnection(userName, password, false); + } + + /// <summary> + /// Creates a new connection to MSMQ. + /// </summary> + public IConnection CreateConnection(string userName, string password, bool useLogging) + { + IConnection connection = new Connection(); + + connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy; + return connection; + } + + /// <summary> + /// Get/or set the broker Uri. + /// </summary> + public Uri BrokerUri + { + get { return brokerUri; } + set { brokerUri = value; } + } + + /// <summary> + /// Get/or set the redelivery policy that new IConnection objects are + /// assigned upon creation. + /// </summary> + public IRedeliveryPolicy RedeliveryPolicy + { + get { return this.redeliveryPolicy; } + set + { + if(value != null) + { + this.redeliveryPolicy = value; + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/ConnectionMetaData.cs ---------------------------------------------------------------------- diff --git a/tags/1.2.0-RC1/src/main/csharp/ConnectionMetaData.cs b/tags/1.2.0-RC1/src/main/csharp/ConnectionMetaData.cs new file mode 100644 index 0000000..5a305c0 --- /dev/null +++ b/tags/1.2.0-RC1/src/main/csharp/ConnectionMetaData.cs @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Reflection; + +namespace Apache.NMS.MSMQ +{ + /// <summary> + /// Implements the Connection Meta-Data feature for Apache.NMS.MSMQ + /// </summary> + public class ConnectionMetaData : IConnectionMetaData + { + private int nmsMajorVersion; + private int nmsMinorVersion; + + private string nmsProviderName; + private string nmsVersion; + + private int providerMajorVersion; + private int providerMinorVersion; + private string providerVersion; + + private string[] nmsxProperties; + + public ConnectionMetaData() + { + Assembly self = Assembly.GetExecutingAssembly(); + AssemblyName asmName = self.GetName(); + + this.nmsProviderName = asmName.Name; + this.providerMajorVersion = asmName.Version.Major; + this.providerMinorVersion = asmName.Version.Minor; + this.providerVersion = asmName.Version.ToString(); + + this.nmsxProperties = new String[] { }; + + foreach(AssemblyName name in self.GetReferencedAssemblies()) + { + if(0 == string.Compare(name.Name, "Apache.NMS", true)) + { + this.nmsMajorVersion = name.Version.Major; + this.nmsMinorVersion = name.Version.Minor; + this.nmsVersion = name.Version.ToString(); + + return; + } + } + + throw new NMSException("Could not find a reference to the Apache.NMS Assembly."); + } + + public int NMSMajorVersion + { + get { return this.nmsMajorVersion; } + } + + public int NMSMinorVersion + { + get { return this.nmsMinorVersion; } + } + + public string NMSProviderName + { + get { return this.nmsProviderName; } + } + + public string NMSVersion + { + get { return this.nmsVersion; } + } + + public string[] NMSXPropertyNames + { + get { return this.nmsxProperties; } + } + + public int ProviderMajorVersion + { + get { return this.providerMajorVersion; } + } + + public int ProviderMinorVersion + { + get { return this.providerMinorVersion; } + } + + public string ProviderVersion + { + get { return this.providerVersion; } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/DefaultMessageConverter.cs ---------------------------------------------------------------------- diff --git a/tags/1.2.0-RC1/src/main/csharp/DefaultMessageConverter.cs b/tags/1.2.0-RC1/src/main/csharp/DefaultMessageConverter.cs new file mode 100644 index 0000000..2aa3438 --- /dev/null +++ b/tags/1.2.0-RC1/src/main/csharp/DefaultMessageConverter.cs @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using System; +using System.IO; +using System.Messaging; +using System.Text; +using Apache.NMS.Util; + +namespace Apache.NMS.MSMQ +{ + public enum NMSMessageType + { + BaseMessage, + TextMessage, + BytesMessage, + ObjectMessage, + MapMessage, + StreamMessage + } + + public class DefaultMessageConverter : IMessageConverter + { + public virtual Message ToMsmqMessage(IMessage message) + { + Message msmqMessage = new Message(); + PrimitiveMap metaData = new PrimitiveMap(); + + ConvertMessageBodyToMSMQ(message, msmqMessage); + + if(message.NMSTimeToLive != TimeSpan.Zero) + { + msmqMessage.TimeToBeReceived = message.NMSTimeToLive; + } + + if(message.NMSCorrelationID != null) + { + metaData.SetString("NMSCorrelationID", message.NMSCorrelationID); + } + + msmqMessage.Recoverable = (message.NMSDeliveryMode == MsgDeliveryMode.Persistent); + msmqMessage.Priority = ToMessagePriority(message.NMSPriority); + msmqMessage.ResponseQueue = ToMsmqDestination(message.NMSReplyTo); + if(message.NMSType != null) + { + msmqMessage.Label = message.NMSType; + } + + // Store the NMS meta data in the extension area + msmqMessage.Extension = metaData.Marshal(); + return msmqMessage; + } + + public virtual IMessage ToNmsMessage(Message message) + { + BaseMessage answer = CreateNmsMessage(message); + // Get the NMS meta data from the extension area + PrimitiveMap metaData = PrimitiveMap.Unmarshal(message.Extension); + + try + { + answer.NMSMessageId = message.Id; + answer.NMSCorrelationID = metaData.GetString("NMSCorrelationID"); + answer.NMSDeliveryMode = (message.Recoverable ? MsgDeliveryMode.Persistent : MsgDeliveryMode.NonPersistent); + answer.NMSDestination = ToNmsDestination(message.DestinationQueue); + } + catch(InvalidOperationException) + { + } + + try + { + answer.NMSType = message.Label; + answer.NMSReplyTo = ToNmsDestination(message.ResponseQueue); + answer.NMSTimeToLive = message.TimeToBeReceived; + } + catch(InvalidOperationException) + { + } + + return answer; + } + + private static MessagePriority ToMessagePriority(MsgPriority msgPriority) + { + switch(msgPriority) + { + case MsgPriority.Lowest: + return MessagePriority.Lowest; + + case MsgPriority.VeryLow: + return MessagePriority.VeryLow; + + case MsgPriority.Low: + case MsgPriority.AboveLow: + return MessagePriority.Low; + + default: + case MsgPriority.BelowNormal: + case MsgPriority.Normal: + return MessagePriority.Normal; + + case MsgPriority.AboveNormal: + return MessagePriority.AboveNormal; + + case MsgPriority.High: + return MessagePriority.High; + + case MsgPriority.VeryHigh: + return MessagePriority.VeryHigh; + + case MsgPriority.Highest: + return MessagePriority.Highest; + } + } + + protected virtual void ConvertMessageBodyToMSMQ(IMessage message, Message answer) + { + if(message is TextMessage) + { + TextMessage textMessage = message as TextMessage; + byte[] buf = Encoding.UTF32.GetBytes(textMessage.Text); + answer.BodyStream.Write(buf, 0, buf.Length); + answer.AppSpecific = (int) NMSMessageType.TextMessage; + } + else if(message is BytesMessage) + { + BytesMessage bytesMessage = message as BytesMessage; + answer.BodyStream.Write(bytesMessage.Content, 0, bytesMessage.Content.Length); + answer.AppSpecific = (int) NMSMessageType.BytesMessage; + } + else if(message is ObjectMessage) + { + ObjectMessage objectMessage = message as ObjectMessage; + answer.Body = objectMessage.Body; + answer.AppSpecific = (int) NMSMessageType.ObjectMessage; + } + else if(message is MapMessage) + { + MapMessage mapMessage = message as MapMessage; + PrimitiveMap mapBody = mapMessage.Body as PrimitiveMap; + byte[] buf = mapBody.Marshal(); + answer.BodyStream.Write(buf, 0, buf.Length); + answer.AppSpecific = (int) NMSMessageType.MapMessage; + } + else if(message is StreamMessage) + { + StreamMessage streamMessage = message as StreamMessage; + answer.AppSpecific = (int) NMSMessageType.StreamMessage; + // TODO: Implement + } + else if(message is BaseMessage) + { + answer.AppSpecific = (int) NMSMessageType.BaseMessage; + } + else + { + throw new Exception("unhandled message type"); + } + } + + protected virtual BaseMessage CreateNmsMessage(Message message) + { + BaseMessage result = null; + + if((int) NMSMessageType.TextMessage == message.AppSpecific) + { + TextMessage textMessage = new TextMessage(); + string content = String.Empty; + + if(message.BodyStream != null && message.BodyStream.Length > 0) + { + byte[] buf = null; + buf = new byte[message.BodyStream.Length]; + message.BodyStream.Read(buf, 0, buf.Length); + content = Encoding.UTF32.GetString(buf); + } + + textMessage.Text = content; + result = textMessage; + } + else if((int) NMSMessageType.BytesMessage == message.AppSpecific) + { + byte[] buf = null; + + if(message.BodyStream != null && message.BodyStream.Length > 0) + { + buf = new byte[message.BodyStream.Length]; + message.BodyStream.Read(buf, 0, buf.Length); + } + + BytesMessage bytesMessage = new BytesMessage(); + bytesMessage.Content = buf; + result = bytesMessage; + } + else if((int) NMSMessageType.ObjectMessage == message.AppSpecific) + { + ObjectMessage objectMessage = new ObjectMessage(); + + objectMessage.Body = message.Body; + result = objectMessage; + } + else if((int) NMSMessageType.MapMessage == message.AppSpecific) + { + byte[] buf = null; + + if(message.BodyStream != null && message.BodyStream.Length > 0) + { + buf = new byte[message.BodyStream.Length]; + message.BodyStream.Read(buf, 0, buf.Length); + } + + MapMessage mapMessage = new MapMessage(); + mapMessage.Body = PrimitiveMap.Unmarshal(buf); + result = mapMessage; + } + else if((int) NMSMessageType.StreamMessage == message.AppSpecific) + { + StreamMessage streamMessage = new StreamMessage(); + + // TODO: Implement + result = streamMessage; + } + else + { + BaseMessage baseMessage = new BaseMessage(); + + result = baseMessage; + } + + return result; + } + + public MessageQueue ToMsmqDestination(IDestination destination) + { + if(null == destination) + { + return null; + } + + return new MessageQueue((destination as Destination).Path); + } + + protected virtual IDestination ToNmsDestination(MessageQueue destinationQueue) + { + if(null == destinationQueue) + { + return null; + } + + return new Queue(destinationQueue.Path); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/Destination.cs ---------------------------------------------------------------------- diff --git a/tags/1.2.0-RC1/src/main/csharp/Destination.cs b/tags/1.2.0-RC1/src/main/csharp/Destination.cs new file mode 100644 index 0000000..ba97213 --- /dev/null +++ b/tags/1.2.0-RC1/src/main/csharp/Destination.cs @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using System; +namespace Apache.NMS.MSMQ +{ + + /// <summary> + /// Summary description for Destination. + /// </summary> + public abstract class Destination : IDestination + { + + private String path = ""; + + /** + * The Default Constructor + */ + protected Destination() + { + } + + /** + * Construct the Destination with a defined physical name; + * + * @param name + */ + protected Destination(String name) + { + Path = name; + } + + public String Path + { + get { return this.path; } + set + { + this.path = value; + if(!this.path.Contains("\\")) + { + // Queues must have paths in them. If no path specified, then + // default to local machine. + this.path = ".\\" + this.path; + } + } + } + + + public bool IsTopic + { + get + { + return DestinationType == DestinationType.Topic + || DestinationType == DestinationType.TemporaryTopic; + } + } + + public bool IsQueue + { + get + { + return !IsTopic; + } + } + + + public bool IsTemporary + { + get + { + return DestinationType == DestinationType.TemporaryQueue + || DestinationType == DestinationType.TemporaryTopic; + } + } + + /** + * @return string representation of this instance + */ + public override String ToString() + { + return this.path; + } + + /** + * @return hashCode for this instance + */ + public override int GetHashCode() + { + int answer = 37; + + if(this.path != null) + { + answer = path.GetHashCode(); + } + if(IsTopic) + { + answer ^= 0xfabfab; + } + return answer; + } + + /** + * if the object passed in is equivalent, return true + * + * @param obj the object to compare + * @return true if this instance and obj are equivalent + */ + public override bool Equals(Object obj) + { + bool result = this == obj; + if(!result && obj != null && obj is Destination) + { + Destination other = (Destination) obj; + result = this.DestinationType == other.DestinationType + && this.path.Equals(other.path); + } + return result; + } + + /** + * Factory method to create a child destination if this destination is a composite + * @param name + * @return the created Destination + */ + public abstract Destination CreateDestination(String name); + + + public abstract DestinationType DestinationType + { + get; + } + + } +} + http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/IMessageConverter.cs ---------------------------------------------------------------------- diff --git a/tags/1.2.0-RC1/src/main/csharp/IMessageConverter.cs b/tags/1.2.0-RC1/src/main/csharp/IMessageConverter.cs new file mode 100644 index 0000000..152377b --- /dev/null +++ b/tags/1.2.0-RC1/src/main/csharp/IMessageConverter.cs @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using System.Messaging; + +namespace Apache.NMS.MSMQ +{ + public interface IMessageConverter + { + + /// <summary> + /// Method ToMSMQMessageQueue + /// </summary> + /// <param name="destination">An IDestination</param> + /// <returns>A MessageQueue</returns> + MessageQueue ToMsmqDestination(IDestination destination); + + Message ToMsmqMessage(IMessage message); + IMessage ToNmsMessage(Message message); + } +} http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/MapMessage.cs ---------------------------------------------------------------------- diff --git a/tags/1.2.0-RC1/src/main/csharp/MapMessage.cs b/tags/1.2.0-RC1/src/main/csharp/MapMessage.cs new file mode 100644 index 0000000..e900328 --- /dev/null +++ b/tags/1.2.0-RC1/src/main/csharp/MapMessage.cs @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using Apache.NMS.Util; + +namespace Apache.NMS.MSMQ +{ + public class MapMessage : BaseMessage, IMapMessage + { + private IPrimitiveMap body = new PrimitiveMap(); + + public IPrimitiveMap Body + { + get { return body; } + set { body = value; } + } + } +} + http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/MessageConsumer.cs ---------------------------------------------------------------------- diff --git a/tags/1.2.0-RC1/src/main/csharp/MessageConsumer.cs b/tags/1.2.0-RC1/src/main/csharp/MessageConsumer.cs new file mode 100644 index 0000000..ee57b96 --- /dev/null +++ b/tags/1.2.0-RC1/src/main/csharp/MessageConsumer.cs @@ -0,0 +1,231 @@ +using System; +using System.Messaging; +using System.Threading; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using Apache.NMS.Util; + +namespace Apache.NMS.MSMQ +{ + /// <summary> + /// An object capable of receiving messages from some destination + /// </summary> + public class MessageConsumer : IMessageConsumer + { + protected TimeSpan zeroTimeout = new TimeSpan(0); + + private readonly Session session; + private readonly AcknowledgementMode acknowledgementMode; + private MessageQueue messageQueue; + private event MessageListener listener; + private int listenerCount = 0; + private Thread asyncDeliveryThread = null; + private AutoResetEvent pause = new AutoResetEvent(false); + private Atomic<bool> asyncDelivery = new Atomic<bool>(false); + + public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, MessageQueue messageQueue) + { + this.session = session; + this.acknowledgementMode = acknowledgementMode; + this.messageQueue = messageQueue; + if(null != this.messageQueue) + { + this.messageQueue.MessageReadPropertyFilter.SetAll(); + } + } + + public event MessageListener Listener + { + add + { + listener += value; + listenerCount++; + StartAsyncDelivery(); + } + + remove + { + if(listenerCount > 0) + { + listener -= value; + listenerCount--; + } + + if(0 == listenerCount) + { + StopAsyncDelivery(); + } + } + } + + public IMessage Receive() + { + IMessage nmsMessage = null; + + if(messageQueue != null) + { + Message message; + + try + { + message = messageQueue.Receive(zeroTimeout); + } + catch + { + message = null; + } + + if(null == message) + { + ReceiveCompletedEventHandler receiveMsg = + delegate(Object source, ReceiveCompletedEventArgs asyncResult) { + message = messageQueue.EndReceive(asyncResult.AsyncResult); + pause.Set(); + }; + + messageQueue.ReceiveCompleted += receiveMsg; + messageQueue.BeginReceive(); + pause.WaitOne(); + messageQueue.ReceiveCompleted -= receiveMsg; + } + + nmsMessage = ToNmsMessage(message); + } + + return nmsMessage; + } + + public IMessage Receive(TimeSpan timeout) + { + IMessage nmsMessage = null; + + if(messageQueue != null) + { + Message message = messageQueue.Receive(timeout); + nmsMessage = ToNmsMessage(message); + } + + return nmsMessage; + } + + public IMessage ReceiveNoWait() + { + IMessage nmsMessage = null; + + if(messageQueue != null) + { + Message message = messageQueue.Receive(zeroTimeout); + nmsMessage = ToNmsMessage(message); + } + + return nmsMessage; + } + + public void Dispose() + { + Close(); + } + + public void Close() + { + StopAsyncDelivery(); + if(messageQueue != null) + { + messageQueue.Dispose(); + messageQueue = null; + } + } + + protected virtual void StopAsyncDelivery() + { + if(asyncDelivery.CompareAndSet(true, false)) + { + if(null != asyncDeliveryThread) + { + Tracer.Info("Stopping async delivery thread."); + pause.Set(); + if(!asyncDeliveryThread.Join(10000)) + { + Tracer.Info("Aborting async delivery thread."); + asyncDeliveryThread.Abort(); + } + + asyncDeliveryThread = null; + Tracer.Info("Async delivery thread stopped."); + } + } + } + + protected virtual void StartAsyncDelivery() + { + if(asyncDelivery.CompareAndSet(false, true)) + { + asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop)); + asyncDeliveryThread.Name = "Message Consumer Dispatch: " + messageQueue.QueueName; + asyncDeliveryThread.IsBackground = true; + asyncDeliveryThread.Start(); + } + } + + protected virtual void DispatchLoop() + { + Tracer.Info("Starting dispatcher thread consumer: " + this); + while(asyncDelivery.Value) + { + try + { + IMessage message = Receive(); + if(asyncDelivery.Value && message != null) + { + try + { + listener(message); + } + catch(Exception e) + { + HandleAsyncException(e); + } + } + } + catch(ThreadAbortException ex) + { + Tracer.InfoFormat("Thread abort received in thread: {0} : {1}", this, ex.Message); + break; + } + catch(Exception ex) + { + Tracer.ErrorFormat("Exception while receiving message in thread: {0} : {1}", this, ex.Message); + } + } + Tracer.Info("Stopping dispatcher thread consumer: " + this); + } + + protected virtual void HandleAsyncException(Exception e) + { + session.Connection.HandleException(e); + } + + protected virtual IMessage ToNmsMessage(Message message) + { + if(message == null) + { + return null; + } + return session.MessageConverter.ToNmsMessage(message); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/MessageProducer.cs ---------------------------------------------------------------------- diff --git a/tags/1.2.0-RC1/src/main/csharp/MessageProducer.cs b/tags/1.2.0-RC1/src/main/csharp/MessageProducer.cs new file mode 100644 index 0000000..26598d3 --- /dev/null +++ b/tags/1.2.0-RC1/src/main/csharp/MessageProducer.cs @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using System; +using System.Messaging; + +namespace Apache.NMS.MSMQ +{ + /// <summary> + /// An object capable of sending messages to some destination + /// </summary> + public class MessageProducer : IMessageProducer + { + + private readonly Session session; + private Destination destination; + + //private long messageCounter; + private MsgDeliveryMode deliveryMode; + private TimeSpan timeToLive; + private MsgPriority priority; + private bool disableMessageID; + private bool disableMessageTimestamp; + + private MessageQueue messageQueue; + + public MessageProducer(Session session, Destination destination) + { + this.session = session; + this.destination = destination; + if(destination != null) + { + messageQueue = openMessageQueue(destination); + } + } + + private MessageQueue openMessageQueue(Destination dest) + { + MessageQueue rc = null; + try + { + if(!MessageQueue.Exists(dest.Path)) + { + // create the new message queue and make it transactional + rc = MessageQueue.Create(dest.Path, session.Transacted); + this.destination.Path = rc.Path; + } + else + { + rc = new MessageQueue(dest.Path); + this.destination.Path = rc.Path; + if(!rc.CanWrite) + { + throw new NMSSecurityException("Do not have write access to: " + dest); + } + } + } + catch(Exception e) + { + if(rc != null) + { + rc.Dispose(); + } + + throw new NMSException(e.Message + ": " + dest, e); + } + return rc; + } + + public void Send(IMessage message) + { + Send(Destination, message); + } + + public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive) + { + Send(Destination, message, deliveryMode, priority, timeToLive); + } + + public void Send(IDestination destination, IMessage message) + { + Send(destination, message, DeliveryMode, Priority, TimeToLive); + } + + public void Send(IDestination destination, IMessage imessage, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive) + { + BaseMessage message = (BaseMessage) imessage; + MessageQueue mq = null; + + try + { + // Locate the MSMQ Queue we will be sending to + if(messageQueue != null) + { + if(destination.Equals(this.destination)) + { + mq = messageQueue; + } + else + { + throw new NMSException("This producer can only be used to send to: " + destination); + } + } + else + { + mq = openMessageQueue((Destination) destination); + } + + message.NMSDeliveryMode = deliveryMode; + message.NMSTimeToLive = timeToLive; + message.NMSPriority = priority; + if(!DisableMessageTimestamp) + { + message.NMSTimestamp = DateTime.UtcNow; + } + + if(!DisableMessageID) + { + // TODO: message.NMSMessageId = + } + + // Convert the Mesasge into a MSMQ message + Message msg = session.MessageConverter.ToMsmqMessage(message); + + if(mq.Transactional) + { + if(session.Transacted) + { + mq.Send(msg, session.MessageQueueTransaction); + + } + else + { + // Start our own mini transaction here to send the message. + using(MessageQueueTransaction transaction = new MessageQueueTransaction()) + { + transaction.Begin(); + mq.Send(msg, transaction); + transaction.Commit(); + } + } + } + else + { + if(session.Transacted) + { + // We may want to raise an exception here since app requested + // a transeced NMS session, but is using a non transacted message queue + // For now silently ignore it. + } + mq.Send(msg); + } + + } + finally + { + if(mq != null && mq != messageQueue) + { + mq.Dispose(); + } + } + } + + public void Close() + { + if(messageQueue != null) + { + messageQueue.Dispose(); + messageQueue = null; + } + } + + public void Dispose() + { + Close(); + } + + public IMessage CreateMessage() + { + return session.CreateMessage(); + } + + public ITextMessage CreateTextMessage() + { + return session.CreateTextMessage(); + } + + public ITextMessage CreateTextMessage(String text) + { + return session.CreateTextMessage(text); + } + + public IMapMessage CreateMapMessage() + { + return session.CreateMapMessage(); + } + + public IObjectMessage CreateObjectMessage(Object body) + { + return session.CreateObjectMessage(body); + } + + public IBytesMessage CreateBytesMessage() + { + return session.CreateBytesMessage(); + } + + public IBytesMessage CreateBytesMessage(byte[] body) + { + return session.CreateBytesMessage(body); + } + + public IStreamMessage CreateStreamMessage() + { + return session.CreateStreamMessage(); + } + + public MsgDeliveryMode DeliveryMode + { + get { return deliveryMode; } + set { deliveryMode = value; } + } + + public TimeSpan TimeToLive + { + get { return timeToLive; } + set { timeToLive = value; } + } + + /// <summary> + /// The default timeout for network requests. + /// </summary> + public TimeSpan RequestTimeout + { + get { return NMSConstants.defaultRequestTimeout; } + set { } + } + + public IDestination Destination + { + get { return destination; } + set { destination = (Destination) value; } + } + + public MsgPriority Priority + { + get { return priority; } + set { priority = value; } + } + + public bool DisableMessageID + { + get { return disableMessageID; } + set { disableMessageID = value; } + } + + public bool DisableMessageTimestamp + { + get { return disableMessageTimestamp; } + set { disableMessageTimestamp = value; } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/ObjectMessage.cs ---------------------------------------------------------------------- diff --git a/tags/1.2.0-RC1/src/main/csharp/ObjectMessage.cs b/tags/1.2.0-RC1/src/main/csharp/ObjectMessage.cs new file mode 100644 index 0000000..dd9a5f5 --- /dev/null +++ b/tags/1.2.0-RC1/src/main/csharp/ObjectMessage.cs @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System.IO; + +#if !(PocketPC||NETCF||NETCF_2_0) +using System.Runtime.Serialization; +using System.Runtime.Serialization.Formatters.Binary; +#endif + +namespace Apache.NMS.MSMQ +{ + public class ObjectMessage : BaseMessage, IObjectMessage + { + private object body; +#if !(PocketPC||NETCF||NETCF_2_0) + private IFormatter formatter; +#endif + + public ObjectMessage() + { + } + + public ObjectMessage(object body) + { + this.body = body; + } + + public object Body + { + get + { +#if !(PocketPC||NETCF||NETCF_2_0) + if(body == null) + { + body = Formatter.Deserialize(new MemoryStream(Content)); + } +#else +#endif + return body; + } + + set + { +#if !(PocketPC||NETCF||NETCF_2_0) + body = value; +#else + throw new NotImplementedException(); +#endif + } + } + + +#if !(PocketPC||NETCF||NETCF_2_0) + public IFormatter Formatter + { + get + { + if(formatter == null) + { + formatter = new BinaryFormatter(); + } + return formatter; + } + + set + { + formatter = value; + } + } + +#endif + } +} + + + + + + http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/Queue.cs ---------------------------------------------------------------------- diff --git a/tags/1.2.0-RC1/src/main/csharp/Queue.cs b/tags/1.2.0-RC1/src/main/csharp/Queue.cs new file mode 100644 index 0000000..30efcd9 --- /dev/null +++ b/tags/1.2.0-RC1/src/main/csharp/Queue.cs @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using System; + +namespace Apache.NMS.MSMQ +{ + + /// <summary> + /// Summary description for Queue. + /// </summary> + public class Queue : Destination, IQueue + { + + public Queue() + : base() + { + } + + public Queue(String name) + : base(name) + { + } + + override public DestinationType DestinationType + { + get + { + return DestinationType.Queue; + } + } + + public String QueueName + { + get { return Path; } + } + + + public override Destination CreateDestination(String name) + { + return new Queue(name); + } + + + } +} + http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/e964b541/tags/1.2.0-RC1/src/main/csharp/Session.cs ---------------------------------------------------------------------- diff --git a/tags/1.2.0-RC1/src/main/csharp/Session.cs b/tags/1.2.0-RC1/src/main/csharp/Session.cs new file mode 100644 index 0000000..6ee3d9e --- /dev/null +++ b/tags/1.2.0-RC1/src/main/csharp/Session.cs @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using System; +using System.Messaging; + +namespace Apache.NMS.MSMQ +{ + /// <summary> + /// MSQM provider of ISession + /// </summary> + public class Session : ISession + { + private Connection connection; + private AcknowledgementMode acknowledgementMode; + private MessageQueueTransaction messageQueueTransaction; + private IMessageConverter messageConverter; + + public Session(Connection connection, AcknowledgementMode acknowledgementMode) + { + this.connection = connection; + this.acknowledgementMode = acknowledgementMode; + MessageConverter = connection.MessageConverter; + if(this.acknowledgementMode == AcknowledgementMode.Transactional) + { + MessageQueueTransaction = new MessageQueueTransaction(); + } + } + + public void Dispose() + { + if(MessageQueueTransaction != null) + { + MessageQueueTransaction.Dispose(); + } + } + + public IMessageProducer CreateProducer() + { + return CreateProducer(null); + } + + public IMessageProducer CreateProducer(IDestination destination) + { + return new MessageProducer(this, (Destination) destination); + } + + public IMessageConsumer CreateConsumer(IDestination destination) + { + return CreateConsumer(destination, null); + } + + public IMessageConsumer CreateConsumer(IDestination destination, string selector) + { + return CreateConsumer(destination, selector, false); + } + + public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal) + { + if(selector != null) + { + throw new NotSupportedException("Selectors are not supported by MSMQ"); + } + MessageQueue queue = MessageConverter.ToMsmqDestination(destination); + return new MessageConsumer(this, acknowledgementMode, queue); + } + + public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal) + { + throw new NotSupportedException("Durable Topic subscribers are not supported by MSMQ"); + } + + public void DeleteDurableConsumer(string name) + { + throw new NotSupportedException("Durable Topic subscribers are not supported by MSMQ"); + } + + public IQueueBrowser CreateBrowser(IQueue queue) + { + throw new NotImplementedException(); + } + + public IQueueBrowser CreateBrowser(IQueue queue, string selector) + { + throw new NotImplementedException(); + } + + public IQueue GetQueue(string name) + { + return new Queue(name); + } + + public ITopic GetTopic(string name) + { + throw new NotSupportedException("Topics are not supported by MSMQ"); + } + + public ITemporaryQueue CreateTemporaryQueue() + { + throw new NotSupportedException("Tempoary Queues are not supported by MSMQ"); + } + + public ITemporaryTopic CreateTemporaryTopic() + { + throw new NotSupportedException("Tempoary Topics are not supported by MSMQ"); + } + + /// <summary> + /// Delete a destination (Queue, Topic, Temp Queue, Temp Topic). + /// </summary> + public void DeleteDestination(IDestination destination) + { + // TODO: Implement if possible. If not possible, then change exception to NotSupportedException(). + throw new NotImplementedException(); + } + + public IMessage CreateMessage() + { + BaseMessage answer = new BaseMessage(); + return answer; + } + + + public ITextMessage CreateTextMessage() + { + TextMessage answer = new TextMessage(); + return answer; + } + + public ITextMessage CreateTextMessage(string text) + { + TextMessage answer = new TextMessage(text); + return answer; + } + + public IMapMessage CreateMapMessage() + { + return new MapMessage(); + } + + public IBytesMessage CreateBytesMessage() + { + return new BytesMessage(); + } + + public IBytesMessage CreateBytesMessage(byte[] body) + { + BytesMessage answer = new BytesMessage(); + answer.Content = body; + return answer; + } + + public IStreamMessage CreateStreamMessage() + { + return new StreamMessage(); + } + + public IObjectMessage CreateObjectMessage(Object body) + { + ObjectMessage answer = new ObjectMessage(); + answer.Body = body; + return answer; + } + + public void Commit() + { + if(!Transacted) + { + throw new InvalidOperationException("You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + acknowledgementMode); + } + messageQueueTransaction.Commit(); + } + + public void Rollback() + { + if(!Transacted) + { + throw new InvalidOperationException("You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + acknowledgementMode); + } + messageQueueTransaction.Abort(); + } + + // Properties + public Connection Connection + { + get { return connection; } + } + + /// <summary> + /// The default timeout for network requests. + /// </summary> + public TimeSpan RequestTimeout + { + get { return NMSConstants.defaultRequestTimeout; } + set { } + } + + public bool Transacted + { + get { return acknowledgementMode == AcknowledgementMode.Transactional; } + } + + public AcknowledgementMode AcknowledgementMode + { + get { throw new NotImplementedException(); } + } + + public MessageQueueTransaction MessageQueueTransaction + { + get + { + if(null != messageQueueTransaction + && messageQueueTransaction.Status != MessageQueueTransactionStatus.Pending) + { + messageQueueTransaction.Begin(); + } + + return messageQueueTransaction; + } + set { messageQueueTransaction = value; } + } + + public IMessageConverter MessageConverter + { + get { return messageConverter; } + set { messageConverter = value; } + } + + public void Close() + { + Dispose(); + } + } +}
