Added: 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/Commands/StreamMessage.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/Commands/StreamMessage.cs?rev=1723221&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/Commands/StreamMessage.cs
 (added)
+++ 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/Commands/StreamMessage.cs
 Wed Jan  6 02:19:56 2016
@@ -0,0 +1,901 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.IO;
+using System.Collections;
+
+using Apache.NMS;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Commands
+{
+    public class StreamMessage : Message, IStreamMessage
+    {
+        private EndianBinaryReader dataIn = null;
+        private EndianBinaryWriter dataOut = null;
+        private MemoryStream byteBuffer = null;
+        private int bytesRemaining = -1;
+
+        public bool ReadBoolean()
+        {
+            InitializeReading();
+
+            try
+            {
+                long startingPos = this.byteBuffer.Position;
+                try
+                {
+                    int type = this.dataIn.ReadByte();
+
+                    if(type == PrimitiveMap.BOOLEAN_TYPE)
+                    {
+                        return this.dataIn.ReadBoolean();
+                    }
+                    else if(type == PrimitiveMap.STRING_TYPE)
+                    {
+                        return Boolean.Parse(this.dataIn.ReadString16());
+                    }
+                    else if(type == PrimitiveMap.NULL)
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new NMSException("Cannot convert Null type to a 
bool");
+                    }
+                    else
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new MessageFormatException("Value is not a 
Boolean type.");
+                    }
+                }
+                catch(FormatException e)
+                {
+                    this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                    throw NMSExceptionSupport.CreateMessageFormatException(e);
+                }
+            }
+            catch(EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch(IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public byte ReadByte()
+        {
+            InitializeReading();
+
+            try
+            {
+                long startingPos = this.byteBuffer.Position;
+                try
+                {
+                    int type = this.dataIn.ReadByte();
+
+                    if(type == PrimitiveMap.BYTE_TYPE)
+                    {
+                        return this.dataIn.ReadByte();
+                    }
+                    else if(type == PrimitiveMap.STRING_TYPE)
+                    {
+                        return Byte.Parse(this.dataIn.ReadString16());
+                    }
+                    else if(type == PrimitiveMap.NULL)
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new NMSException("Cannot convert Null type to a 
byte");
+                    }
+                    else
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new MessageFormatException("Value is not a Byte 
type.");
+                    }
+                }
+                catch(FormatException e)
+                {
+                    this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                    throw NMSExceptionSupport.CreateMessageFormatException(e);
+                }
+            }
+            catch(EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch(IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public char ReadChar()
+        {
+            InitializeReading();
+
+            try
+            {
+                long startingPos = this.byteBuffer.Position;
+                try
+                {
+                    int type = this.dataIn.ReadByte();
+
+                    if(type == PrimitiveMap.CHAR_TYPE)
+                    {
+                        return this.dataIn.ReadChar();
+                    }
+                    else if(type == PrimitiveMap.NULL)
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new NMSException("Cannot convert Null type to a 
char");
+                    }
+                    else
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new MessageFormatException("Value is not a Char 
type.");
+                    }
+                }
+                catch(FormatException e)
+                {
+                    this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                    throw NMSExceptionSupport.CreateMessageFormatException(e);
+                }
+            }
+            catch(EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch(IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public short ReadInt16()
+        {
+            InitializeReading();
+
+            try
+            {
+                long startingPos = this.byteBuffer.Position;
+                try
+                {
+                    int type = this.dataIn.ReadByte();
+
+                    if(type == PrimitiveMap.SHORT_TYPE)
+                    {
+                        return this.dataIn.ReadInt16();
+                    }
+                    else if(type == PrimitiveMap.BYTE_TYPE)
+                    {
+                        return this.dataIn.ReadByte();
+                    }
+                    else if(type == PrimitiveMap.STRING_TYPE)
+                    {
+                        return Int16.Parse(this.dataIn.ReadString16());
+                    }
+                    else if(type == PrimitiveMap.NULL)
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new NMSException("Cannot convert Null type to a 
short");
+                    }
+                    else
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new MessageFormatException("Value is not a Int16 
type.");
+                    }
+                }
+                catch(FormatException e)
+                {
+                    this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                    throw NMSExceptionSupport.CreateMessageFormatException(e);
+                }
+            }
+            catch(EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch(IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public int ReadInt32()
+        {
+            InitializeReading();
+
+            try
+            {
+                long startingPos = this.byteBuffer.Position;
+                try
+                {
+                    int type = this.dataIn.ReadByte();
+
+                    if(type == PrimitiveMap.INTEGER_TYPE)
+                    {
+                        return this.dataIn.ReadInt32();
+                    }
+                    else if(type == PrimitiveMap.SHORT_TYPE)
+                    {
+                        return this.dataIn.ReadInt16();
+                    }
+                    else if(type == PrimitiveMap.BYTE_TYPE)
+                    {
+                        return this.dataIn.ReadByte();
+                    }
+                    else if(type == PrimitiveMap.STRING_TYPE)
+                    {
+                        return Int32.Parse(this.dataIn.ReadString16());
+                    }
+                    else if(type == PrimitiveMap.NULL)
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new NMSException("Cannot convert Null type to a 
int");
+                    }
+                    else
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new MessageFormatException("Value is not a Int32 
type.");
+                    }
+                }
+                catch(FormatException e)
+                {
+                    this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                    throw NMSExceptionSupport.CreateMessageFormatException(e);
+                }
+            }
+            catch(EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch(IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public long ReadInt64()
+        {
+            InitializeReading();
+
+            try
+            {
+                long startingPos = this.byteBuffer.Position;
+                try
+                {
+                    int type = this.dataIn.ReadByte();
+
+                    if(type == PrimitiveMap.LONG_TYPE)
+                    {
+                        return this.dataIn.ReadInt64();
+                    }
+                    else if(type == PrimitiveMap.INTEGER_TYPE)
+                    {
+                        return this.dataIn.ReadInt32();
+                    }
+                    else if(type == PrimitiveMap.SHORT_TYPE)
+                    {
+                        return this.dataIn.ReadInt16();
+                    }
+                    else if(type == PrimitiveMap.BYTE_TYPE)
+                    {
+                        return this.dataIn.ReadByte();
+                    }
+                    else if(type == PrimitiveMap.STRING_TYPE)
+                    {
+                        return Int64.Parse(this.dataIn.ReadString16());
+                    }
+                    else if(type == PrimitiveMap.NULL)
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new NMSException("Cannot convert Null type to a 
long");
+                    }
+                    else
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new MessageFormatException("Value is not a Int64 
type.");
+                    }
+                }
+                catch(FormatException e)
+                {
+                    this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                    throw NMSExceptionSupport.CreateMessageFormatException(e);
+                }
+            }
+            catch(EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch(IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public float ReadSingle()
+        {
+            InitializeReading();
+
+            try
+            {
+                long startingPos = this.byteBuffer.Position;
+                try
+                {
+                    int type = this.dataIn.ReadByte();
+
+                    if(type == PrimitiveMap.FLOAT_TYPE)
+                    {
+                        return this.dataIn.ReadSingle();
+                    }
+                    else if(type == PrimitiveMap.STRING_TYPE)
+                    {
+                        return Single.Parse(this.dataIn.ReadString16());
+                    }
+                    else if(type == PrimitiveMap.NULL)
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new NMSException("Cannot convert Null type to a 
float");
+                    }
+                    else
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new MessageFormatException("Value is not a 
Single type.");
+                    }
+                }
+                catch(FormatException e)
+                {
+                    this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                    throw NMSExceptionSupport.CreateMessageFormatException(e);
+                }
+            }
+            catch(EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch(IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public double ReadDouble()
+        {
+            InitializeReading();
+
+            try
+            {
+                long startingPos = this.byteBuffer.Position;
+                try
+                {
+                    int type = this.dataIn.ReadByte();
+
+                    if(type == PrimitiveMap.DOUBLE_TYPE)
+                    {
+                        return this.dataIn.ReadDouble();
+                    }
+                    else if(type == PrimitiveMap.FLOAT_TYPE)
+                    {
+                        return this.dataIn.ReadSingle();
+                    }
+                    else if(type == PrimitiveMap.STRING_TYPE)
+                    {
+                        return Single.Parse(this.dataIn.ReadString16());
+                    }
+                    else if(type == PrimitiveMap.NULL)
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new NMSException("Cannot convert Null type to a 
double");
+                    }
+                    else
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new MessageFormatException("Value is not a 
Double type.");
+                    }
+                }
+                catch(FormatException e)
+                {
+                    this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                    throw NMSExceptionSupport.CreateMessageFormatException(e);
+                }
+            }
+            catch(EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch(IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public string ReadString()
+        {
+            InitializeReading();
+
+            long startingPos = this.byteBuffer.Position;
+
+            try
+            {
+                int type = this.dataIn.ReadByte();
+
+                if(type == PrimitiveMap.BIG_STRING_TYPE)
+                {
+                    return this.dataIn.ReadString32();
+                }
+                else if(type == PrimitiveMap.STRING_TYPE)
+                {
+                    return this.dataIn.ReadString16();
+                }
+                else if(type == PrimitiveMap.LONG_TYPE)
+                {
+                    return this.dataIn.ReadInt64().ToString();
+                }
+                else if(type == PrimitiveMap.INTEGER_TYPE)
+                {
+                    return this.dataIn.ReadInt32().ToString();
+                }
+                else if(type == PrimitiveMap.SHORT_TYPE)
+                {
+                    return this.dataIn.ReadInt16().ToString();
+                }
+                else if(type == PrimitiveMap.FLOAT_TYPE)
+                {
+                    return this.dataIn.ReadSingle().ToString();
+                }
+                else if(type == PrimitiveMap.DOUBLE_TYPE)
+                {
+                    return this.dataIn.ReadDouble().ToString();
+                }
+                else if(type == PrimitiveMap.CHAR_TYPE)
+                {
+                    return this.dataIn.ReadChar().ToString();
+                }
+                else if(type == PrimitiveMap.BYTE_TYPE)
+                {
+                    return this.dataIn.ReadByte().ToString();
+                }
+                else if(type == PrimitiveMap.BOOLEAN_TYPE)
+                {
+                    return this.dataIn.ReadBoolean().ToString();
+                }
+                else if(type == PrimitiveMap.NULL)
+                {
+                    return null;
+                }
+                else
+                {
+                    this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                    throw new MessageFormatException("Value is not a known 
type.");
+                }
+            }
+            catch(FormatException e)
+            {
+                this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+            catch(EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch(IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public int ReadBytes(byte[] value)
+        {
+            InitializeReading();
+
+            if(value == null)
+            {
+                throw new NullReferenceException("Passed Byte Array is null");
+            }
+
+            try
+            {
+                if(this.bytesRemaining == -1)
+                {
+                    long startingPos = this.byteBuffer.Position;
+                    byte type = this.dataIn.ReadByte();
+
+                    if(type != PrimitiveMap.BYTE_ARRAY_TYPE)
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new MessageFormatException("Not a byte array");
+                    }
+
+                    this.bytesRemaining = this.dataIn.ReadInt32();
+                }
+                else if(this.bytesRemaining == 0)
+                {
+                    this.bytesRemaining = -1;
+                    return -1;
+                }
+
+                if(value.Length <= this.bytesRemaining)
+                {
+                    // small buffer
+                    this.bytesRemaining -= value.Length;
+                    this.dataIn.Read(value, 0, value.Length);
+                    return value.Length;
+                }
+                else
+                {
+                    // big buffer
+                    int rc = this.dataIn.Read(value, 0, this.bytesRemaining);
+                    this.bytesRemaining = 0;
+                    return rc;
+                }
+            }
+            catch(EndOfStreamException ex)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(ex);
+            }
+            catch(IOException ex)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(ex);
+            }
+        }
+
+        public Object ReadObject()
+        {
+            InitializeReading();
+
+            long startingPos = this.byteBuffer.Position;
+
+            try
+            {
+                int type = this.dataIn.ReadByte();
+
+                if(type == PrimitiveMap.BIG_STRING_TYPE)
+                {
+                    return this.dataIn.ReadString32();
+                }
+                else if(type == PrimitiveMap.STRING_TYPE)
+                {
+                    return this.dataIn.ReadString16();
+                }
+                else if(type == PrimitiveMap.LONG_TYPE)
+                {
+                    return this.dataIn.ReadInt64();
+                }
+                else if(type == PrimitiveMap.INTEGER_TYPE)
+                {
+                    return this.dataIn.ReadInt32();
+                }
+                else if(type == PrimitiveMap.SHORT_TYPE)
+                {
+                    return this.dataIn.ReadInt16();
+                }
+                else if(type == PrimitiveMap.FLOAT_TYPE)
+                {
+                    return this.dataIn.ReadSingle();
+                }
+                else if(type == PrimitiveMap.DOUBLE_TYPE)
+                {
+                    return this.dataIn.ReadDouble();
+                }
+                else if(type == PrimitiveMap.CHAR_TYPE)
+                {
+                    return this.dataIn.ReadChar();
+                }
+                else if(type == PrimitiveMap.BYTE_TYPE)
+                {
+                    return this.dataIn.ReadByte();
+                }
+                else if(type == PrimitiveMap.BOOLEAN_TYPE)
+                {
+                    return this.dataIn.ReadBoolean();
+                }
+                else if(type == PrimitiveMap.BYTE_ARRAY_TYPE)
+                {
+                    int length = this.dataIn.ReadInt32();
+                    byte[] data = new byte[length];
+                    this.dataIn.Read(data, 0, length);
+                    return data;
+                }
+                else if(type == PrimitiveMap.NULL)
+                {
+                    return null;
+                }
+                else
+                {
+                    this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                    throw new MessageFormatException("Value is not a known 
type.");
+                }
+            }
+            catch(FormatException e)
+            {
+                this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+            catch(EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch(IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public void WriteBoolean(bool value)
+        {
+            InitializeWriting();
+            try
+            {
+                this.dataOut.Write(PrimitiveMap.BOOLEAN_TYPE);
+                this.dataOut.Write(value);
+            }
+            catch(IOException e)
+            {
+                NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteByte(byte value)
+        {
+            InitializeWriting();
+            try
+            {
+                this.dataOut.Write(PrimitiveMap.BYTE_TYPE);
+                this.dataOut.Write(value);
+            }
+            catch(IOException e)
+            {
+                NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteBytes(byte[] value)
+        {
+            InitializeWriting();
+            this.WriteBytes(value, 0, value.Length);
+        }
+
+        public void WriteBytes(byte[] value, int offset, int length)
+        {
+            InitializeWriting();
+            try
+            {
+                this.dataOut.Write(PrimitiveMap.BYTE_ARRAY_TYPE);
+                this.dataOut.Write((int) length);
+                this.dataOut.Write(value, offset, length);
+            }
+            catch(IOException e)
+            {
+                NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteChar(char value)
+        {
+            InitializeWriting();
+            try
+            {
+                this.dataOut.Write(PrimitiveMap.CHAR_TYPE);
+                this.dataOut.Write(value);
+            }
+            catch(IOException e)
+            {
+                NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteInt16(short value)
+        {
+            InitializeWriting();
+            try
+            {
+                this.dataOut.Write(PrimitiveMap.SHORT_TYPE);
+                this.dataOut.Write(value);
+            }
+            catch(IOException e)
+            {
+                NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteInt32(int value)
+        {
+            InitializeWriting();
+            try
+            {
+                this.dataOut.Write(PrimitiveMap.INTEGER_TYPE);
+                this.dataOut.Write(value);
+            }
+            catch(IOException e)
+            {
+                NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteInt64(long value)
+        {
+            InitializeWriting();
+            try
+            {
+                this.dataOut.Write(PrimitiveMap.LONG_TYPE);
+                this.dataOut.Write(value);
+            }
+            catch(IOException e)
+            {
+                NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteSingle(float value)
+        {
+            InitializeWriting();
+            try
+            {
+                this.dataOut.Write(PrimitiveMap.FLOAT_TYPE);
+                this.dataOut.Write(value);
+            }
+            catch(IOException e)
+            {
+                NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteDouble(double value)
+        {
+            InitializeWriting();
+            try
+            {
+                this.dataOut.Write(PrimitiveMap.DOUBLE_TYPE);
+                this.dataOut.Write(value);
+            }
+            catch(IOException e)
+            {
+                NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteString(string value)
+        {
+            InitializeWriting();
+            try
+            {
+                if( value.Length > 8192 )
+                {
+                    this.dataOut.Write(PrimitiveMap.BIG_STRING_TYPE);
+                    this.dataOut.WriteString32(value);
+                }
+                else
+                {
+                    this.dataOut.Write(PrimitiveMap.STRING_TYPE);
+                    this.dataOut.WriteString16(value);
+                }
+            }
+            catch(IOException e)
+            {
+                NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteObject(Object value)
+        {
+            InitializeWriting();
+            if( value is System.Byte )
+            {
+                this.WriteByte( (byte) value );
+            }
+            else if( value is Char )
+            {
+                this.WriteChar( (char) value );
+            }
+            else if( value is Boolean )
+            {
+                this.WriteBoolean( (bool) value );
+            }
+            else if( value is Int16 )
+            {
+                this.WriteInt16( (short) value );
+            }
+            else if( value is Int32 )
+            {
+                this.WriteInt32( (int) value );
+            }
+            else if( value is Int64 )
+            {
+                this.WriteInt64( (long) value );
+            }
+            else if( value is Single )
+            {
+                this.WriteSingle( (float) value );
+            }
+            else if( value is Double )
+            {
+                this.WriteDouble( (double) value );
+            }
+            else if( value is byte[] )
+            {
+                this.WriteBytes( (byte[]) value );
+            }
+            else if( value is String )
+            {
+                this.WriteString( (string) value );
+            }
+            else
+            {
+                throw new MessageFormatException("Cannot write non-primitive 
type:" + value.GetType());
+            }
+        }
+
+        public override Object Clone()
+        {
+            StoreContent();
+            return base.Clone();
+        }
+
+        public override void ClearBody()
+        {
+            base.ClearBody();
+            this.byteBuffer = null;
+            this.dataIn = null;
+            this.dataOut = null;
+            this.bytesRemaining = -1;
+        }
+
+        public void Reset()
+        {
+            StoreContent();
+            this.dataIn = null;
+            this.dataOut = null;
+            this.byteBuffer = null;
+            this.bytesRemaining = -1;
+            this.ReadOnlyBody = true;
+        }
+
+        private void InitializeReading()
+        {
+            FailIfWriteOnlyBody();
+            if(this.dataIn == null)
+            {
+                this.byteBuffer = new MemoryStream(this.Content, false);
+                this.dataIn = new EndianBinaryReader(this.byteBuffer);
+            }
+        }
+
+        private void InitializeWriting()
+        {
+            FailIfReadOnlyBody();
+            if(this.dataOut == null)
+            {
+                this.byteBuffer = new MemoryStream();
+                this.dataOut = new EndianBinaryWriter(this.byteBuffer);
+            }
+        }
+
+        private void StoreContent()
+        {
+            if( dataOut != null)
+            {
+                dataOut.Close();
+
+                this.Content = byteBuffer.ToArray();
+                this.dataOut = null;
+                this.byteBuffer = null;
+            }
+        }
+
+    }
+}
+

Added: 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/Commands/TempDestination.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/Commands/TempDestination.cs?rev=1723221&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/Commands/TempDestination.cs
 (added)
+++ 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/Commands/TempDestination.cs
 Wed Jan  6 02:19:56 2016
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.Commands
+{
+    public abstract class TempDestination : Destination
+    {
+        /// <summary>
+        /// Method CreateDestination
+        /// </summary>
+        /// <returns>An Destination</returns>
+        /// <param name="name">A  String</param>
+        public override Destination CreateDestination(String name)
+        {
+            return null;
+        }
+
+        abstract override public DestinationType DestinationType
+        {
+            get;
+        }
+
+        public TempDestination()
+            : base()
+        {
+        }
+
+        public TempDestination(String name)
+            : base(name)
+        {
+        }
+
+        public override Object Clone()
+        {
+            // Since we are a derived class use the base's Clone()
+            // to perform the shallow copy. Since it is shallow it
+            // will include our derived class. Since we are derived,
+            // this method is an override.
+            TempDestination o = (TempDestination) base.Clone();
+
+            // Now do the deep work required
+            // If any new variables are added then this routine will
+            // likely need updating
+
+            return o;
+        }
+
+        public void Delete()
+        {
+            throw new NotSupportedException("Stomp Cannot Delete Temporary 
Destinations");
+        }
+    }
+}
+

Added: 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/Commands/TempQueue.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/Commands/TempQueue.cs?rev=1723221&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/Commands/TempQueue.cs
 (added)
+++ 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/Commands/TempQueue.cs
 Wed Jan  6 02:19:56 2016
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+
+namespace Apache.NMS.Commands
+{
+    /// <summary>
+    /// A Temporary Queue
+    /// </summary>
+    public class TempQueue : TempDestination, ITemporaryQueue
+    {
+        public TempQueue()
+            : base()
+        {
+        }
+
+        public TempQueue(String name)
+            : base(name)
+        {
+        }
+
+        override public DestinationType DestinationType
+        {
+            get
+            {
+                return DestinationType.TemporaryQueue;
+            }
+        }
+
+        public String QueueName
+        {
+            get { return PhysicalName; }
+        }
+
+        public String GetQueueName()
+        {
+            return PhysicalName;
+        }
+
+        public override int GetDestinationType()
+        {
+            return TEMPORARY_QUEUE;
+        }
+
+        public override Destination CreateDestination(String name)
+        {
+            return new TempQueue(name);
+        }
+
+        public override Object Clone()
+        {
+            // Since we are a derived class use the base's Clone()
+            // to perform the shallow copy. Since it is shallow it
+            // will include our derived class. Since we are derived,
+            // this method is an override.
+            TempQueue o = (TempQueue) base.Clone();
+
+            // Now do the deep work required
+            // If any new variables are added then this routine will
+            // likely need updating
+
+            return o;
+        }
+
+    }
+}
+

Added: 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/Commands/TempTopic.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/Commands/TempTopic.cs?rev=1723221&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/Commands/TempTopic.cs
 (added)
+++ 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/Commands/TempTopic.cs
 Wed Jan  6 02:19:56 2016
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+
+namespace Apache.NMS.Commands
+{
+
+    /// <summary>
+    /// A Temporary Topic
+    /// </summary>
+    public class TempTopic : TempDestination, ITemporaryTopic
+    {
+        public TempTopic() : base()
+        {
+        }
+
+        public TempTopic(String name) : base(name)
+        {
+        }
+
+        override public DestinationType DestinationType
+        {
+            get { return DestinationType.TemporaryTopic; }
+        }
+
+        public String TopicName
+        {
+            get { return PhysicalName; }
+        }
+
+        public String GetTopicName()
+        {
+            return PhysicalName;
+        }
+
+        public override int GetDestinationType()
+        {
+            return TEMPORARY_TOPIC;
+        }
+
+        public override Destination CreateDestination(String name)
+        {
+            return new TempTopic(name);
+        }
+
+        public override Object Clone()
+        {
+            // Since we are a derived class use the base's Clone()
+            // to perform the shallow copy. Since it is shallow it
+            // will include our derived class. Since we are derived,
+            // this method is an override.
+            TempTopic o = (TempTopic) base.Clone();
+
+            // Now do the deep work required
+            // If any new variables are added then this routine will
+            // likely need updating
+
+            return o;
+        }
+
+    }
+}
+

Added: 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/Commands/TextMessage.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/Commands/TextMessage.cs?rev=1723221&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/Commands/TextMessage.cs
 (added)
+++ 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/Commands/TextMessage.cs
 Wed Jan  6 02:19:56 2016
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.IO;
+
+using Apache.NMS;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Commands
+{
+    public class TextMessage : Message, ITextMessage
+    {
+        private String text = null;
+
+        public TextMessage()
+        {
+        }
+
+        public TextMessage(String text)
+        {
+            this.Text = text;
+        }
+
+        public override string ToString()
+        {
+            string text = this.Text;
+
+            if(text != null && text.Length > 63)
+            {
+                text = text.Substring(0, 45) + "..." + 
text.Substring(text.Length - 12);
+            }
+            return base.ToString() + " Text = " + (text ?? "null");
+        }
+
+        public override void ClearBody()
+        {
+            base.ClearBody();
+            this.text = null;
+        }
+
+        // Properties
+
+        public string Text
+        {
+            get { return this.text; }
+            set
+            {
+                FailIfReadOnlyBody();
+                this.text = value;
+                this.Content = null;
+            }
+        }
+    }
+}
+

Added: 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/Commands/Topic.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/Commands/Topic.cs?rev=1723221&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/Commands/Topic.cs 
(added)
+++ 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/Commands/Topic.cs 
Wed Jan  6 02:19:56 2016
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+
+namespace Apache.NMS.Commands
+{
+
+    /// <summary>
+    /// Summary description for Topic.
+    /// </summary>
+    public class Topic : Destination, ITopic
+    {
+        public Topic() : base()
+        {
+        }
+
+        public Topic(String name) : base(name)
+        {
+        }
+
+        override public DestinationType DestinationType
+        {
+            get
+            {
+                return DestinationType.Topic;
+            }
+        }
+
+        public String TopicName
+        {
+            get { return PhysicalName; }
+        }
+
+        public override int GetDestinationType()
+        {
+            return TOPIC;
+        }
+
+        public override Destination CreateDestination(String name)
+        {
+            return new Topic(name);
+        }
+
+        public override Object Clone()
+        {
+            // Since we are a derived class use the base's Clone()
+            // to perform the shallow copy. Since it is shallow it
+            // will include our derived class. Since we are derived,
+            // this method is an override.
+            Topic o = (Topic) base.Clone();
+
+            // Now do the deep work required
+            // If any new variables are added then this routine will
+            // likely need updating
+
+            return o;
+        }
+    }
+}
+

Added: 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/ConnectionTest.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/ConnectionTest.cs?rev=1723221&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/ConnectionTest.cs 
(added)
+++ 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/ConnectionTest.cs 
Wed Jan  6 02:19:56 2016
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using NUnit.Framework;
+
+namespace Apache.NMS.Test
+{
+       //[TestFixture]
+       public class ConnectionTest : NMSTest
+       {
+               IConnection startedConnection = null;
+               IConnection stoppedConnection = null;
+
+               protected ConnectionTest(NMSTestSupport testSupport)
+                       : base(testSupport)
+               {
+               }
+
+               //[SetUp]
+               public override void SetUp()
+               {
+                       base.SetUp();
+
+                       startedConnection = CreateConnection(null);
+                       startedConnection.Start();
+                       stoppedConnection = CreateConnection(null);
+               }
+
+               //[TearDown]
+               public override void TearDown()
+               {
+                       startedConnection.Close();
+                       stoppedConnection.Close();
+
+                       base.TearDown();
+               }
+
+               /// <summary>
+               /// Verify that it is possible to create multiple connections 
to the broker.
+               /// There was a bug in the connection factory which set the 
clientId member which made
+               /// it impossible to create an additional connection.
+               /// </summary>
+               //[Test]
+               public virtual void TestTwoConnections()
+               {
+                       using(IConnection connection1 = CreateConnection(null))
+                       {
+                               connection1.Start();
+                               using(IConnection connection2 = 
CreateConnection(null))
+                               {
+                                       // with the bug present we'll get an 
exception in connection2.start()
+                                       connection2.Start();
+                               }
+                       }
+               }
+
+               //[Test]
+               public virtual void TestCreateAndDisposeWithConsumer(
+                       //[Values(true, false)]
+                       bool disposeConsumer, string testQueueRef)
+               {
+                       using(IConnection connection = 
CreateConnection("DisposalTestConnection"))
+                       {
+                               connection.Start();
+
+                               using(ISession session = 
connection.CreateSession())
+                               {
+                                       IDestination destination = 
GetClearDestination(session, DestinationType.Queue, testQueueRef);
+                                       IMessageConsumer consumer = 
session.CreateConsumer(destination);
+
+                                       connection.Stop();
+                                       if(disposeConsumer)
+                                       {
+                                               consumer.Dispose();
+                                       }
+                               }
+                       }
+               }
+
+               //[Test]
+               public virtual void TestCreateAndDisposeWithProducer(
+                       //[Values(true, false)]
+                       bool disposeProducer, string testQueueRef)
+               {
+                       using(IConnection connection = 
CreateConnection("DisposalTestConnection"))
+                       {
+                               connection.Start();
+
+                               using(ISession session = 
connection.CreateSession())
+                               {
+                                       IDestination destination = 
GetClearDestination(session, DestinationType.Queue, testQueueRef);
+                                       IMessageProducer producer = 
session.CreateProducer(destination);
+
+                                       connection.Stop();
+                                       if(disposeProducer)
+                                       {
+                                               producer.Dispose();
+                                       }
+                               }
+                       }
+               }
+
+               //[Test]
+               public virtual void TestStartAfterSend(
+                       //[Values(MsgDeliveryMode.Persistent, 
MsgDeliveryMode.NonPersistent)]
+                       MsgDeliveryMode deliveryMode,
+                       //[Values(DestinationType.Queue, DestinationType.Topic)]
+                       DestinationType destinationType, string 
testDestinationRef)
+               {
+                       using(IConnection connection = 
CreateConnection(GetTestClientId()))
+                       {
+                               ISession session = 
connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                               IDestination destination = 
GetClearDestination(session, destinationType, testDestinationRef);
+                               IMessageConsumer consumer = 
session.CreateConsumer(destination);
+
+                               // Send the messages
+                               SendMessages(session, destination, 
deliveryMode, 1);
+
+                               // Start the conncection after the message was 
sent.
+                               connection.Start();
+
+                               // Make sure only 1 message was delivered.
+                               
Assert.IsNotNull(consumer.Receive(TimeSpan.FromMilliseconds(1000)));
+                               Assert.IsNull(consumer.ReceiveNoWait());
+                       }
+               }
+
+               /// <summary>
+               /// Tests if the consumer receives the messages that were sent 
before the
+               /// connection was started.
+               /// </summary>
+               //[Test]
+               public virtual void 
TestStoppedConsumerHoldsMessagesTillStarted(string testTopicRef)
+               {
+                       ISession startedSession = 
startedConnection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                       ISession stoppedSession = 
stoppedConnection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+
+                       // Setup the consumers.
+                       IDestination topic = 
GetClearDestination(startedSession, DestinationType.Topic, testTopicRef);
+                       IMessageConsumer startedConsumer = 
startedSession.CreateConsumer(topic);
+                       IMessageConsumer stoppedConsumer = 
stoppedSession.CreateConsumer(topic);
+
+                       // Send the message.
+                       IMessageProducer producer = 
startedSession.CreateProducer(topic);
+                       ITextMessage message = 
startedSession.CreateTextMessage("Hello");
+                       producer.Send(message);
+
+                       // Test the assertions.
+                       IMessage m = 
startedConsumer.Receive(TimeSpan.FromMilliseconds(1000));
+                       Assert.IsNotNull(m);
+
+                       m = 
stoppedConsumer.Receive(TimeSpan.FromMilliseconds(1000));
+                       Assert.IsNull(m);
+
+                       stoppedConnection.Start();
+                       m = 
stoppedConsumer.Receive(TimeSpan.FromMilliseconds(5000));
+                       Assert.IsNotNull(m);
+
+                       startedSession.Close();
+                       stoppedSession.Close();
+               }
+
+               /// <summary>
+               /// Tests if the consumer is able to receive messages eveb when 
the
+               /// connecction restarts multiple times.
+               /// </summary>
+               //[Test]
+               public virtual void TestMultipleConnectionStops(string 
testTopicRef)
+               {
+                       
TestStoppedConsumerHoldsMessagesTillStarted(testTopicRef);
+                       stoppedConnection.Stop();
+                       
TestStoppedConsumerHoldsMessagesTillStarted(testTopicRef);
+                       stoppedConnection.Stop();
+                       
TestStoppedConsumerHoldsMessagesTillStarted(testTopicRef);
+               }
+       }
+}

Added: 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/ConsumerTest.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/ConsumerTest.cs?rev=1723221&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/ConsumerTest.cs 
(added)
+++ 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/ConsumerTest.cs 
Wed Jan  6 02:19:56 2016
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Threading;
+using NUnit.Framework;
+
+namespace Apache.NMS.Test
+{
+       //[TestFixture]
+       public class ConsumerTest : NMSTest
+       {
+               protected const int COUNT = 25;
+               protected const string VALUE_NAME = "value";
+
+               private bool dontAck;
+
+               protected ConsumerTest(NMSTestSupport testSupport)
+                       : base(testSupport)
+               {
+               }
+
+// The .NET CF does not have the ability to interrupt threads, so this test is 
impossible.
+#if !NETCF
+               //[Test]
+               public virtual void TestNoTimeoutConsumer(
+                       //[Values(AcknowledgementMode.AutoAcknowledge, 
AcknowledgementMode.ClientAcknowledge,
+                       //      AcknowledgementMode.DupsOkAcknowledge, 
AcknowledgementMode.Transactional)]
+                       AcknowledgementMode ackMode)
+               {
+                       // Launch a thread to perform 
IMessageConsumer.Receive().
+                       // If it doesn't fail in less than three seconds, no 
exception was thrown.
+                       Thread receiveThread = new Thread(new 
ThreadStart(TimeoutConsumerThreadProc));
+                       using(IConnection connection = CreateConnection())
+                       {
+                               connection.Start();
+                               using(ISession session = 
connection.CreateSession(ackMode))
+                               {
+                                       ITemporaryQueue queue = 
session.CreateTemporaryQueue();
+                                       using(this.timeoutConsumer = 
session.CreateConsumer(queue))
+                                       {
+                                               receiveThread.Start();
+                                               if(receiveThread.Join(3000))
+                                               {
+                                                       
Assert.Fail("IMessageConsumer.Receive() returned without blocking.  Test 
failed.");
+                                               }
+                                               else
+                                               {
+                                                       // Kill the thread - 
otherwise it'll sit in Receive() until a message arrives.
+                                                       
receiveThread.Interrupt();
+                                               }
+                                       }
+                               }
+                       }
+               }
+
+               protected IMessageConsumer timeoutConsumer;
+
+               protected void TimeoutConsumerThreadProc()
+               {
+                       try
+                       {
+                               timeoutConsumer.Receive();
+                       }
+                       catch(ArgumentOutOfRangeException e)
+                       {
+                               // The test failed.  We will know because the 
timeout will expire inside TestNoTimeoutConsumer().
+                               Assert.Fail("Test failed with exception: " + 
e.Message);
+                       }
+                       catch(ThreadInterruptedException)
+                       {
+                               // The test succeeded!  We were still blocked 
when we were interrupted.
+                       }
+                       catch(Exception e)
+                       {
+                               // Some other exception occurred.
+                               Assert.Fail("Test failed with exception: " + 
e.Message);
+                       }
+               }
+
+               //[Test]
+               public virtual void TestSyncReceiveConsumerClose(
+                       //[Values(AcknowledgementMode.AutoAcknowledge, 
AcknowledgementMode.ClientAcknowledge,
+                       //      AcknowledgementMode.DupsOkAcknowledge, 
AcknowledgementMode.Transactional)]
+                       AcknowledgementMode ackMode)
+               {
+                       // Launch a thread to perform 
IMessageConsumer.Receive().
+                       // If it doesn't fail in less than three seconds, no 
exception was thrown.
+                       Thread receiveThread = new Thread(new 
ThreadStart(TimeoutConsumerThreadProc));
+                       using (IConnection connection = CreateConnection())
+                       {
+                               connection.Start();
+                               using (ISession session = 
connection.CreateSession(ackMode))
+                               {
+                                       ITemporaryQueue queue = 
session.CreateTemporaryQueue();
+                                       using (this.timeoutConsumer = 
session.CreateConsumer(queue))
+                                       {
+                                               receiveThread.Start();
+                                               if (receiveThread.Join(3000))
+                                               {
+                                                       
Assert.Fail("IMessageConsumer.Receive() returned without blocking.  Test 
failed.");
+                                               }
+                                               else
+                                               {
+                                                       // Kill the thread - 
otherwise it'll sit in Receive() until a message arrives.
+                                                       
this.timeoutConsumer.Close();
+                                                       
receiveThread.Join(10000);
+                                                       if 
(receiveThread.IsAlive)
+                                                       {
+                                                               // Kill the 
thread - otherwise it'll sit in Receive() until a message arrives.
+                                                               
receiveThread.Interrupt();
+                                                               
Assert.Fail("IMessageConsumer.Receive() thread is still alive, Close should 
have killed it.");
+                                                       }
+                                               }
+                                       }
+                               }
+                       }
+               }
+
+               internal class ThreadArg
+               {
+                       internal IConnection connection = null;
+                       internal ISession session = null;
+                       internal IDestination destination = null;
+               }
+
+               protected void DelayedProducerThreadProc(Object arg)
+               {
+                       try
+                       {
+                               ThreadArg args = arg as ThreadArg;
+
+                               using(ISession session = 
args.connection.CreateSession())
+                               {
+                                       using(IMessageProducer producer = 
session.CreateProducer(args.destination))
+                                       {
+                                               // Give the consumer time to 
enter the receive.
+                                               Thread.Sleep(5000);
+
+                                               
producer.Send(args.session.CreateTextMessage("Hello World"));
+                                       }
+                               }
+                       }
+                       catch(Exception e)
+                       {
+                               // Some other exception occurred.
+                               Assert.Fail("Test failed with exception: " + 
e.Message);
+                       }
+               }
+
+               //[Test]
+               public virtual void TestDoChangeSentMessage(
+                       //[Values(AcknowledgementMode.AutoAcknowledge, 
AcknowledgementMode.ClientAcknowledge,
+                       //      AcknowledgementMode.DupsOkAcknowledge, 
AcknowledgementMode.Transactional)]
+                       AcknowledgementMode ackMode,
+                       //[Values(true, false)]
+                       bool doClear)
+               {
+                       using(IConnection connection = CreateConnection())
+                       {
+                               connection.Start();
+                               using(ISession session = 
connection.CreateSession(ackMode))
+                               {
+                                       ITemporaryQueue queue = 
session.CreateTemporaryQueue();
+                                       using(IMessageConsumer consumer = 
session.CreateConsumer(queue))
+                                       {
+                                               IMessageProducer producer = 
session.CreateProducer(queue);
+                                               ITextMessage message = 
session.CreateTextMessage();
+
+                                               string prefix = "ConsumerTest - 
TestDoChangeSentMessage: ";
+
+                                               for(int i = 0; i < COUNT; i++)
+                                               {
+                                                       
message.Properties[VALUE_NAME] = i;
+                                                       message.Text = prefix + 
Convert.ToString(i);
+
+                                                       producer.Send(message);
+
+                                                       if(doClear)
+                                                       {
+                                                               
message.ClearBody();
+                                                               
message.ClearProperties();
+                                                       }
+                                               }
+
+                                               if(ackMode == 
AcknowledgementMode.Transactional)
+                                               {
+                                                       session.Commit();
+                                               }
+
+                                               for(int i = 0; i < COUNT; i++)
+                                               {
+                                                       ITextMessage msg = 
consumer.Receive(TimeSpan.FromMilliseconds(2000)) as ITextMessage;
+                                                       
Assert.AreEqual(msg.Text, prefix + Convert.ToString(i));
+                                                       
Assert.AreEqual(msg.Properties.GetInt(VALUE_NAME), i);
+                                               }
+
+                                               if(ackMode == 
AcknowledgementMode.Transactional)
+                                               {
+                                                       session.Commit();
+                                               }
+
+                                       }
+                               }
+                       }
+               }
+
+               //[Test]
+               public virtual void TestConsumerReceiveBeforeMessageDispatched(
+                       //[Values(AcknowledgementMode.AutoAcknowledge, 
AcknowledgementMode.ClientAcknowledge,
+                       //      AcknowledgementMode.DupsOkAcknowledge, 
AcknowledgementMode.Transactional)]
+                       AcknowledgementMode ackMode)
+               {
+                       // Launch a thread to perform a delayed send.
+                       Thread sendThread = new 
Thread(DelayedProducerThreadProc);
+                       using(IConnection connection = CreateConnection())
+                       {
+                               connection.Start();
+                               using(ISession session = 
connection.CreateSession(ackMode))
+                               {
+                                       ITemporaryQueue queue = 
session.CreateTemporaryQueue();
+                                       using(IMessageConsumer consumer = 
session.CreateConsumer(queue))
+                                       {
+                                               ThreadArg arg = new ThreadArg();
+
+                                               arg.connection = connection;
+                                               arg.session = session;
+                                               arg.destination = queue;
+
+                                               sendThread.Start(arg);
+                                               IMessage message = 
consumer.Receive(TimeSpan.FromMinutes(1));
+                                               Assert.IsNotNull(message);
+                                       }
+                               }
+                       }
+               }
+
+               //[Test]
+               public virtual void TestDontStart(
+                       //[Values(MsgDeliveryMode.NonPersistent)]
+                       MsgDeliveryMode deliveryMode,
+                       //[Values(DestinationType.Queue, DestinationType.Topic)]
+                       DestinationType destinationType, string 
testDestinationRef)
+               {
+                       using(IConnection connection = CreateConnection())
+                       {
+                               ISession session = connection.CreateSession();
+                               IDestination destination = 
GetClearDestination(session, destinationType, testDestinationRef);
+                               IMessageConsumer consumer = 
session.CreateConsumer(destination);
+
+                               // Send the messages
+                               SendMessages(session, destination, 
deliveryMode, 1);
+
+                               // Make sure no messages were delivered.
+                               
Assert.IsNull(consumer.Receive(TimeSpan.FromMilliseconds(1000)));
+                       }
+               }
+
+               //[Test]
+               public void TestSendReceiveTransacted(
+                       //[Values(MsgDeliveryMode.NonPersistent, 
MsgDeliveryMode.Persistent)]
+                       MsgDeliveryMode deliveryMode,
+                       //[Values(DestinationType.Queue, DestinationType.Topic, 
DestinationType.TemporaryQueue, DestinationType.TemporaryTopic)]
+                       DestinationType destinationType, string 
testDestinationRef)
+               {
+                       using(IConnection connection = CreateConnection())
+                       {
+                               // Send a message to the broker.
+                               connection.Start();
+                               ISession session = 
connection.CreateSession(AcknowledgementMode.Transactional);
+                               IDestination destination = 
GetClearDestination(session, destinationType, testDestinationRef);
+                               IMessageConsumer consumer = 
session.CreateConsumer(destination);
+                               IMessageProducer producer = 
session.CreateProducer(destination);
+
+                               producer.DeliveryMode = deliveryMode;
+                               
producer.Send(session.CreateTextMessage("Test"));
+
+                               // Message should not be delivered until commit.
+                               Thread.Sleep(1000);
+                               Assert.IsNull(consumer.ReceiveNoWait());
+                               session.Commit();
+
+                               // Make sure only 1 message was delivered.
+                               IMessage message = 
consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                               Assert.IsNotNull(message);
+                               Assert.IsFalse(message.NMSRedelivered);
+                               Assert.IsNull(consumer.ReceiveNoWait());
+
+                               // Message should be redelivered is rollback is 
used.
+                               session.Rollback();
+
+                               // Make sure only 1 message was delivered.
+                               message = 
consumer.Receive(TimeSpan.FromMilliseconds(2000));
+                               Assert.IsNotNull(message);
+                               Assert.IsTrue(message.NMSRedelivered);
+                               Assert.IsNull(consumer.ReceiveNoWait());
+
+                               // If we commit now, the message should not be 
redelivered.
+                               session.Commit();
+                               Thread.Sleep(1000);
+                               Assert.IsNull(consumer.ReceiveNoWait());
+                       }
+               }
+
+               //[Test]
+               public virtual void TestAckedMessageAreConsumed(string 
testQueueRef)
+               {
+                       using(IConnection connection = CreateConnection())
+                       {
+                               connection.Start();
+                               ISession session = 
connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+                               IDestination destination = 
GetClearDestination(session, DestinationType.Queue, testQueueRef);
+                               IMessageProducer producer = 
session.CreateProducer(destination);
+                               
producer.Send(session.CreateTextMessage("Hello"));
+
+                               // Consume the message...
+                               IMessageConsumer consumer = 
session.CreateConsumer(destination);
+                               IMessage msg = 
consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                               Assert.IsNotNull(msg);
+                               msg.Acknowledge();
+
+                               // Reset the session.
+                               session.Close();
+                               session = 
connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+
+                               // Attempt to Consume the message...
+                               consumer = session.CreateConsumer(destination);
+                               msg = 
consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                               Assert.IsNull(msg);
+
+                               session.Close();
+                       }
+               }
+
+               //[Test]
+               public virtual void TestLastMessageAcked(string testQueueRef)
+               {
+                       using(IConnection connection = CreateConnection())
+                       {
+                               connection.Start();
+                               ISession session = 
connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+                               IDestination destination = 
GetClearDestination(session, DestinationType.Queue, testQueueRef);
+                               IMessageProducer producer = 
session.CreateProducer(destination);
+                               
producer.Send(session.CreateTextMessage("Hello"));
+                               
producer.Send(session.CreateTextMessage("Hello2"));
+                               
producer.Send(session.CreateTextMessage("Hello3"));
+
+                               // Consume the message...
+                               IMessageConsumer consumer = 
session.CreateConsumer(destination);
+                               IMessage msg = 
consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                               Assert.IsNotNull(msg);
+                               msg = 
consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                               Assert.IsNotNull(msg);
+                               msg = 
consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                               Assert.IsNotNull(msg);
+                               msg.Acknowledge();
+
+                               // Reset the session.
+                               session.Close();
+                               session = 
connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+
+                               // Attempt to Consume the message...
+                               consumer = session.CreateConsumer(destination);
+                               msg = 
consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                               Assert.IsNull(msg);
+
+                               session.Close();
+                       }
+               }
+
+               //[Test]
+               public virtual void 
TestUnAckedMessageAreNotConsumedOnSessionClose(string testQueueRef)
+               {
+                       using(IConnection connection = CreateConnection())
+                       {
+                               connection.Start();
+                               ISession session = 
connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+                               IDestination destination = 
GetClearDestination(session, DestinationType.Queue, testQueueRef);
+                               IMessageProducer producer = 
session.CreateProducer(destination);
+                               
producer.Send(session.CreateTextMessage("Hello"));
+
+                               // Consume the message...
+                               IMessageConsumer consumer = 
session.CreateConsumer(destination);
+                               IMessage msg = 
consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                               Assert.IsNotNull(msg);
+                               // Don't ack the message.
+
+                               // Reset the session.  This should cause the 
unacknowledged message to be re-delivered.
+                               session.Close();
+                               session = 
connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+
+                               // Attempt to Consume the message...
+                               consumer = session.CreateConsumer(destination);
+                               msg = 
consumer.Receive(TimeSpan.FromMilliseconds(2000));
+                               Assert.IsNotNull(msg);
+                               msg.Acknowledge();
+
+                               session.Close();
+                       }
+               }
+
+               //[Test]
+               public virtual void TestAsyncAckedMessageAreConsumed(string 
testQueueRef)
+               {
+                       using(IConnection connection = CreateConnection())
+                       {
+                               connection.Start();
+                               ISession session = 
connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+                               IDestination destination = 
GetClearDestination(session, DestinationType.Queue, testQueueRef);
+                               IMessageProducer producer = 
session.CreateProducer(destination);
+                               
producer.Send(session.CreateTextMessage("Hello"));
+
+                               // Consume the message...
+                               IMessageConsumer consumer = 
session.CreateConsumer(destination);
+                               consumer.Listener += new 
MessageListener(OnMessage);
+
+                               Thread.Sleep(5000);
+
+                               // Reset the session.
+                               session.Close();
+
+                               session = 
connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+
+                               // Attempt to Consume the message...
+                               consumer = session.CreateConsumer(destination);
+                               IMessage msg = 
consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                               Assert.IsNull(msg);
+
+                               session.Close();
+                       }
+               }
+
+               //[Test]
+               public virtual void 
TestAsyncUnAckedMessageAreNotConsumedOnSessionClose(string testQueueRef)
+               {
+                       using(IConnection connection = CreateConnection())
+                       {
+                               connection.Start();
+                               // don't aknowledge message on onMessage() call
+                               dontAck = true;
+                               ISession session = 
connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+                               IDestination destination = 
GetClearDestination(session, DestinationType.Queue, testQueueRef);
+                               IMessageProducer producer = 
session.CreateProducer(destination);
+                               
producer.Send(session.CreateTextMessage("Hello"));
+
+                               // Consume the message...
+                               using(IMessageConsumer consumer = 
session.CreateConsumer(destination))
+                               {
+                                       consumer.Listener += new 
MessageListener(OnMessage);
+                                       // Don't ack the message.
+                               }
+
+                               // Reset the session. This should cause the 
Unacked message to be
+                               // redelivered.
+                               session.Close();
+
+                               Thread.Sleep(5000);
+                               session = 
connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+                               // Attempt to Consume the message...
+                               using(IMessageConsumer consumer = 
session.CreateConsumer(destination))
+                               {
+                                       IMessage msg = 
consumer.Receive(TimeSpan.FromMilliseconds(2000));
+                                       Assert.IsNotNull(msg);
+                                       msg.Acknowledge();
+                               }
+
+                               session.Close();
+                       }
+               }
+
+               //[Test]
+               public virtual void TestAddRemoveAsnycMessageListener()
+               {
+                       using(IConnection connection = CreateConnection())
+                       {
+                               connection.Start();
+
+                               ISession session = 
connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+                               ITemporaryTopic topic = 
session.CreateTemporaryTopic();
+                               IMessageConsumer consumer = 
session.CreateConsumer(topic);
+
+                               consumer.Listener += OnMessage;
+                               consumer.Listener -= OnMessage;
+                               consumer.Listener += OnMessage;
+
+                               consumer.Close();
+                       }
+               }
+
+               public void OnMessage(IMessage message)
+               {
+                       Assert.IsNotNull(message);
+
+                       if(!dontAck)
+                       {
+                               try
+                               {
+                                       message.Acknowledge();
+                               }
+                               catch(Exception)
+                               {
+                               }
+                       }
+               }
+
+               //[Test]
+               public virtual void TestReceiveNoWait(
+                       //[Values(AcknowledgementMode.AutoAcknowledge, 
AcknowledgementMode.ClientAcknowledge,
+                       //      AcknowledgementMode.DupsOkAcknowledge, 
AcknowledgementMode.Transactional)]
+                       AcknowledgementMode ackMode,
+                       //[Values(MsgDeliveryMode.NonPersistent, 
MsgDeliveryMode.Persistent)]
+                       MsgDeliveryMode deliveryMode)
+               {
+                       const int RETRIES = 20;
+
+                       using(IConnection connection = CreateConnection())
+                       {
+                               connection.Start();
+                               using(ISession session = 
connection.CreateSession(ackMode))
+                               {
+                                       IDestination destination = 
session.CreateTemporaryQueue();
+
+                                       using(IMessageProducer producer = 
session.CreateProducer(destination))
+                                       {
+                                               producer.DeliveryMode = 
deliveryMode;
+                                               ITextMessage message = 
session.CreateTextMessage("TEST");
+                                               producer.Send(message);
+
+                                               
if(AcknowledgementMode.Transactional == ackMode)
+                                               {
+                                                       session.Commit();
+                                               }
+                                       }
+
+                                       using(IMessageConsumer consumer = 
session.CreateConsumer(destination))
+                                       {
+                                               IMessage message = null;
+
+                                               for(int i = 0; i < RETRIES && 
message == null; ++i)
+                                               {
+                                                       message = 
consumer.ReceiveNoWait();
+                                                       Thread.Sleep(100);
+                                               }
+
+                                               Assert.IsNotNull(message);
+                                               message.Acknowledge();
+
+                                               
if(AcknowledgementMode.Transactional == ackMode)
+                                               {
+                                                       session.Commit();
+                                               }
+                                       }
+                               }
+                       }
+               }
+
+#endif
+
+    }
+}

Added: 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/DurableTest.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/DurableTest.cs?rev=1723221&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/DurableTest.cs 
(added)
+++ 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/DurableTest.cs 
Wed Jan  6 02:19:56 2016
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Threading;
+using Apache.NMS.Util;
+using NUnit.Framework;
+
+namespace Apache.NMS.Test
+{
+       //[TestFixture]
+       public class DurableTest : NMSTest
+       {
+               protected static string DURABLE_SELECTOR = "2 > 1";
+
+               protected string TEST_CLIENT_AND_CONSUMER_ID;
+               protected string SEND_CLIENT_ID;
+
+               protected DurableTest(NMSTestSupport testSupport)
+                       : base(testSupport)
+               {
+               }
+
+               //[SetUp]
+               public override void SetUp()
+               {
+                       base.SetUp();
+                       
+                       TEST_CLIENT_AND_CONSUMER_ID = GetTestClientId();
+                       SEND_CLIENT_ID = GetTestClientId();
+               }
+
+               //[Test]
+               public virtual void TestSendWhileClosed(
+                       //[Values(AcknowledgementMode.AutoAcknowledge, 
AcknowledgementMode.ClientAcknowledge,
+                       //      AcknowledgementMode.DupsOkAcknowledge, 
AcknowledgementMode.Transactional)]
+                       AcknowledgementMode ackMode, string testTopicRef)
+               {
+                       try
+                       {                               
+                       using(IConnection connection = 
CreateConnection(TEST_CLIENT_AND_CONSUMER_ID))
+                               {
+                               connection.Start();
+
+                                       using(ISession session = 
connection.CreateSession(ackMode))
+                                       {
+                                               ITopic topic = 
(ITopic)GetClearDestination(session, DestinationType.Topic, testTopicRef);
+                                               IMessageProducer producer = 
session.CreateProducer(topic);
+
+                                               producer.DeliveryMode = 
MsgDeliveryMode.Persistent;
+                                                                               
+                                       ISession consumeSession = 
connection.CreateSession(ackMode);
+                                       IMessageConsumer consumer = 
consumeSession.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, null, 
false);
+                                       Thread.Sleep(1000);
+                                       consumer.Dispose();
+                                               consumer = null;
+                                       
+                                               ITextMessage message = 
session.CreateTextMessage("DurableTest-TestSendWhileClosed");
+                                       message.Properties.SetString("test", 
"test");
+                                       message.NMSType = "test";
+                                       producer.Send(message);
+                                               
if(AcknowledgementMode.Transactional == ackMode)
+                                               {
+                                                       session.Commit();
+                                               }
+                                                                               
        
+                                       Thread.Sleep(1000);
+                                               consumer = 
consumeSession.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, null, 
false);
+                                       ITextMessage msg = 
consumer.Receive(TimeSpan.FromMilliseconds(1000)) as ITextMessage;
+                                               msg.Acknowledge();
+                                               
if(AcknowledgementMode.Transactional == ackMode)
+                                               {
+                                                       consumeSession.Commit();
+                                               }
+                                               
+                                               Assert.IsNotNull(msg);
+                                       Assert.AreEqual(msg.Text, 
"DurableTest-TestSendWhileClosed");
+                                       Assert.AreEqual(msg.NMSType, "test");
+                                       
Assert.AreEqual(msg.Properties.GetString("test"), "test");
+                                       }
+                               }
+                       }
+                       catch(Exception ex)
+                       {
+                               Assert.Fail(ex.Message);
+                       }
+                       finally
+                       {
+                // Pause to allow Stomp to unregister at the broker.
+                Thread.Sleep(500);
+
+                               
UnregisterDurableConsumer(TEST_CLIENT_AND_CONSUMER_ID, 
TEST_CLIENT_AND_CONSUMER_ID);
+                       }                       
+           }           
+               
+               //[Test]
+               public void TestDurableConsumerSelectorChange(
+                       //[Values(AcknowledgementMode.AutoAcknowledge, 
AcknowledgementMode.ClientAcknowledge,
+                       //      AcknowledgementMode.DupsOkAcknowledge, 
AcknowledgementMode.Transactional)]
+                       AcknowledgementMode ackMode, string testTopicRef)
+               {
+                       try
+                       {
+                               using(IConnection connection = 
CreateConnection(TEST_CLIENT_AND_CONSUMER_ID))
+                               {
+                                       connection.Start();
+                                       using(ISession session = 
connection.CreateSession(ackMode))
+                                       {
+                                               ITopic topic = 
(ITopic)GetClearDestination(session, DestinationType.Topic, testTopicRef);
+                                               IMessageProducer producer = 
session.CreateProducer(topic);
+                                               IMessageConsumer consumer = 
session.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, 
"color='red'", false);
+
+                                               producer.DeliveryMode = 
MsgDeliveryMode.Persistent;
+
+                                               // Send the messages
+                                               ITextMessage sendMessage = 
session.CreateTextMessage("1st");
+                                               sendMessage.Properties["color"] 
= "red";
+                                               producer.Send(sendMessage);
+                                               
if(AcknowledgementMode.Transactional == ackMode)
+                                               {
+                                                       session.Commit();
+                                               }
+
+                                               ITextMessage receiveMsg = 
consumer.Receive(receiveTimeout) as ITextMessage;
+                                               Assert.IsNotNull(receiveMsg, 
"Failed to retrieve 1st durable message.");
+                                               Assert.AreEqual("1st", 
receiveMsg.Text);
+                                               
Assert.AreEqual(MsgDeliveryMode.Persistent, receiveMsg.NMSDeliveryMode, 
"NMSDeliveryMode does not match");
+                                               receiveMsg.Acknowledge();
+                                               
if(AcknowledgementMode.Transactional == ackMode)
+                                               {
+                                                       session.Commit();
+                                               }
+
+                                               // Change the subscription, 
allowing some time for the Broker to purge the
+                                               // consumers resources.
+                                               consumer.Dispose();
+                        Thread.Sleep(1000);
+                                               
+                                               consumer = 
session.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, 
"color='blue'", false);
+
+                                               sendMessage = 
session.CreateTextMessage("2nd");
+                                               sendMessage.Properties["color"] 
= "red";
+                                               producer.Send(sendMessage);
+                                               sendMessage = 
session.CreateTextMessage("3rd");
+                                               sendMessage.Properties["color"] 
= "blue";
+                                               producer.Send(sendMessage);
+                                               
if(AcknowledgementMode.Transactional == ackMode)
+                                               {
+                                                       session.Commit();
+                                               }
+
+                                               // Selector should skip the 2nd 
message.
+                                               receiveMsg = 
consumer.Receive(receiveTimeout) as ITextMessage;
+                                               Assert.IsNotNull(receiveMsg, 
"Failed to retrieve durable message.");
+                                               Assert.AreEqual("3rd", 
receiveMsg.Text, "Retrieved the wrong durable message.");
+                                               
Assert.AreEqual(MsgDeliveryMode.Persistent, receiveMsg.NMSDeliveryMode, 
"NMSDeliveryMode does not match");
+                                               receiveMsg.Acknowledge();
+                                               
if(AcknowledgementMode.Transactional == ackMode)
+                                               {
+                                                       session.Commit();
+                                               }
+
+                                               // Make sure there are no 
pending messages.
+                                               
Assert.IsNull(consumer.ReceiveNoWait(), "Wrong number of messages in durable 
subscription.");
+                                       }
+                               }
+                       }
+                       catch(Exception ex)
+                       {
+                               Assert.Fail(ex.Message);
+                       }
+                       finally
+                       {
+                // Pause to allow Stomp to unregister at the broker.
+                Thread.Sleep(500);
+
+                               
UnregisterDurableConsumer(TEST_CLIENT_AND_CONSUMER_ID, 
TEST_CLIENT_AND_CONSUMER_ID);
+                       }
+               }
+
+               //[Test]
+               public void TestDurableConsumer(
+                       //[Values(AcknowledgementMode.AutoAcknowledge, 
AcknowledgementMode.ClientAcknowledge,
+                       //      AcknowledgementMode.DupsOkAcknowledge, 
AcknowledgementMode.Transactional)]
+                       AcknowledgementMode ackMode, string 
testDurableTopicName)
+               {
+                       try
+                       {
+                               
RegisterDurableConsumer(TEST_CLIENT_AND_CONSUMER_ID, testDurableTopicName, 
TEST_CLIENT_AND_CONSUMER_ID, null, false);
+                               RunTestDurableConsumer(testDurableTopicName, 
ackMode);
+                               if(AcknowledgementMode.Transactional == ackMode)
+                               {
+                                       
RunTestDurableConsumer(testDurableTopicName, ackMode);
+                               }
+                       }
+                       finally
+                       {
+                // Pause to allow Stomp to unregister at the broker.
+                Thread.Sleep(500);
+                               
+                               
UnregisterDurableConsumer(TEST_CLIENT_AND_CONSUMER_ID, 
TEST_CLIENT_AND_CONSUMER_ID);
+                       }
+               }
+
+               protected void RunTestDurableConsumer(string topicName, 
AcknowledgementMode ackMode)
+               {
+                       SendDurableMessage(topicName);
+                       SendDurableMessage(topicName);
+
+                       using(IConnection connection = 
CreateConnection(TEST_CLIENT_AND_CONSUMER_ID))
+                       {
+                               connection.Start();
+                               using(ISession session = 
connection.CreateSession(ackMode))
+                               {
+                                       ITopic topic = 
SessionUtil.GetTopic(session, topicName);
+                                       using(IMessageConsumer consumer = 
session.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, null, false))
+                                       {
+                                               IMessage msg = 
consumer.Receive(receiveTimeout);
+                                               Assert.IsNotNull(msg, "Did not 
receive first durable message.");
+                                               msg.Acknowledge();
+
+                                               msg = 
consumer.Receive(receiveTimeout);
+                                               Assert.IsNotNull(msg, "Did not 
receive second durable message.");
+                                               msg.Acknowledge();
+
+                                               
if(AcknowledgementMode.Transactional == ackMode)
+                                               {
+                                                       session.Commit();
+                                               }
+                                       }
+                               }
+                       }
+               }
+
+               protected void SendDurableMessage(string topicName)
+               {
+                       using(IConnection connection = 
CreateConnection(SEND_CLIENT_ID))
+                       {
+                               connection.Start();
+                               using(ISession session = 
connection.CreateSession())
+                               {
+                                       ITopic topic = 
SessionUtil.GetTopic(session, topicName);
+                                       using(IMessageProducer producer = 
session.CreateProducer(topic))
+                                       {
+                                               ITextMessage message = 
session.CreateTextMessage("Durable Hello");
+
+                                               producer.DeliveryMode = 
MsgDeliveryMode.Persistent;
+                                               producer.Send(message);
+                                       }
+                               }
+                       }
+               }
+       }
+}

Added: 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/EndianBinaryReaderTest.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/EndianBinaryReaderTest.cs?rev=1723221&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/EndianBinaryReaderTest.cs
 (added)
+++ 
activemq/activemq-dotnet/Apache.NMS.XMS/trunk/src/test/csharp/EndianBinaryReaderTest.cs
 Wed Jan  6 02:19:56 2016
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System.IO;
+using Apache.NMS.Util;
+using NUnit.Framework;
+
+namespace Apache.NMS.Test
+{
+       [TestFixture]
+       public class EndianBinaryReaderTest
+       {
+               public void readString16Helper(byte[] input, char[] expect)
+               {
+                       MemoryStream stream = new MemoryStream(input);
+                       EndianBinaryReader reader = new 
EndianBinaryReader(stream);
+
+                       char[] result = reader.ReadString16().ToCharArray();
+
+                       for(int i = 0; i < expect.Length; ++i)
+                       {
+                               Assert.AreEqual(expect[i], result[i]);
+                       }
+               }
+
+               [Test]
+               public void testReadString16_1byteUTF8encoding()
+               {
+                       // Test data with 1-byte UTF8 encoding.
+                       char[] expect = { '\u0000', '\u000B', '\u0048', 
'\u0065', '\u006C', '\u006C', '\u006F', '\u0020', '\u0057', '\u006F', '\u0072', 
'\u006C', '\u0064' };
+                       byte[] input = { 0x00, 0x0E, 0xC0, 0x80, 0x0B, 0x48, 
0x65, 0x6C, 0x6C, 0x6F, 0x20, 0x57, 0x6F, 0x72, 0x6C, 0x64 };
+
+                       readString16Helper(input, expect);
+               }
+
+               [Test]
+               public void testReadString16_2byteUTF8encoding()
+               {
+                       // Test data with 2-byte UT8 encoding.
+                       char[] expect = { '\u0000', '\u00C2', '\u00A9', 
'\u00C3', '\u00A6' };
+                       byte[] input = { 0x00, 0x0A, 0xC0, 0x80, 0xC3, 0x82, 
0xC2, 0xA9, 0xC3, 0x83, 0xC2, 0xA6 };
+                       readString16Helper(input, expect);
+               }
+
+               [Test]
+               public void testReadString16_1byteAnd2byteEmbeddedNULLs()
+               {
+                       // Test data with 1-byte and 2-byte encoding with 
embedded NULL's.
+                       char[] expect = { '\u0000', '\u0004', '\u00C2', 
'\u00A9', '\u00C3', '\u0000', '\u00A6' };
+                       byte[] input = { 0x00, 0x0D, 0xC0, 0x80, 0x04, 0xC3, 
0x82, 0xC2, 0xA9, 0xC3, 0x83, 0xC0, 0x80, 0xC2, 0xA6 };
+
+                       readString16Helper(input, expect);
+               }
+
+               [Test]
+               [ExpectedException(typeof(IOException))]
+               public void testReadString16_UTF8Missing2ndByte()
+               {
+                       // Test with bad UTF-8 encoding, missing 2nd byte of 
two byte value
+                       byte[] input = { 0x00, 0x0D, 0xC0, 0x80, 0x04, 0xC3, 
0x82, 0xC2, 0xC2, 0xC3, 0x83, 0xC0, 0x80, 0xC2, 0xA6 };
+
+                       MemoryStream stream = new MemoryStream(input);
+                       EndianBinaryReader reader = new 
EndianBinaryReader(stream);
+
+                       reader.ReadString16();
+               }
+
+               [Test]
+               [ExpectedException(typeof(IOException))]
+               public void testReadString16_3byteEncodingMissingLastByte()
+               {
+                       // Test with three byte encode that's missing a last 
byte.
+                       byte[] input = { 0x00, 0x02, 0xE8, 0xA8 };
+
+                       MemoryStream stream = new MemoryStream(input);
+                       EndianBinaryReader reader = new 
EndianBinaryReader(stream);
+
+                       reader.ReadString16();
+               }
+
+               public void readString32Helper(byte[] input, char[] expect)
+               {
+                       MemoryStream stream = new MemoryStream(input);
+                       EndianBinaryReader reader = new 
EndianBinaryReader(stream);
+
+                       char[] result = reader.ReadString32().ToCharArray();
+
+                       for(int i = 0; i < expect.Length; ++i)
+                       {
+                               Assert.AreEqual(expect[i], result[i]);
+                       }
+               }
+
+               [Test]
+               public void testReadString32_1byteUTF8encoding()
+               {
+                       // Test data with 1-byte UTF8 encoding.
+                       char[] expect = { '\u0000', '\u000B', '\u0048', 
'\u0065', '\u006C', '\u006C', '\u006F', '\u0020', '\u0057', '\u006F', '\u0072', 
'\u006C', '\u0064' };
+                       byte[] input = { 0x00, 0x00, 0x00, 0x0E, 0xC0, 0x80, 
0x0B, 0x48, 0x65, 0x6C, 0x6C, 0x6F, 0x20, 0x57, 0x6F, 0x72, 0x6C, 0x64 };
+
+                       readString32Helper(input, expect);
+               }
+
+               [Test]
+               public void testReadString32_2byteUTF8encoding()
+               {
+                       // Test data with 2-byte UT8 encoding.
+                       char[] expect = { '\u0000', '\u00C2', '\u00A9', 
'\u00C3', '\u00A6' };
+                       byte[] input = { 0x00, 0x00, 0x00, 0x0A, 0xC0, 0x80, 
0xC3, 0x82, 0xC2, 0xA9, 0xC3, 0x83, 0xC2, 0xA6 };
+                       readString32Helper(input, expect);
+               }
+
+               [Test]
+               public void testReadString32_1byteAnd2byteEmbeddedNULLs()
+               {
+                       // Test data with 1-byte and 2-byte encoding with 
embedded NULL's.
+                       char[] expect = { '\u0000', '\u0004', '\u00C2', 
'\u00A9', '\u00C3', '\u0000', '\u00A6' };
+                       byte[] input = { 0x00, 0x00, 0x00, 0x0D, 0xC0, 0x80, 
0x04, 0xC3, 0x82, 0xC2, 0xA9, 0xC3, 0x83, 0xC0, 0x80, 0xC2, 0xA6 };
+
+                       readString32Helper(input, expect);
+               }
+
+               [Test]
+               [ExpectedException(typeof(IOException))]
+               public void testReadString32_UTF8Missing2ndByte()
+               {
+                       // Test with bad UTF-8 encoding, missing 2nd byte of 
two byte value
+                       byte[] input = { 0x00, 0x00, 0x00, 0x0D, 0xC0, 0x80, 
0x04, 0xC3, 0x82, 0xC2, 0xC2, 0xC3, 0x83, 0xC0, 0x80, 0xC2, 0xA6 };
+
+                       MemoryStream stream = new MemoryStream(input);
+                       EndianBinaryReader reader = new 
EndianBinaryReader(stream);
+
+                       reader.ReadString32();
+               }
+
+               [Test]
+               [ExpectedException(typeof(IOException))]
+               public void testReadString32_3byteEncodingMissingLastByte()
+               {
+                       // Test with three byte encode that's missing a last 
byte.
+                       byte[] input = { 0x00, 0x00, 0x00, 0x02, 0xE8, 0xA8 };
+
+                       MemoryStream stream = new MemoryStream(input);
+                       EndianBinaryReader reader = new 
EndianBinaryReader(stream);
+
+                       reader.ReadString32();
+               }
+       }
+}


Reply via email to