Github user tabish121 commented on a diff in the pull request:

    https://github.com/apache/activemq-nms-amqp/pull/2#discussion_r204161653
  
    --- Diff: src/main/csharp/Connection.cs ---
    @@ -0,0 +1,1158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +using System;
    +using System.Collections;
    +using System.Collections.Generic;
    +using System.Collections.Specialized;
    +using System.Collections.Concurrent;
    +using System.Linq;
    +using System.Text;
    +using System.Threading;
    +using System.Threading.Tasks;
    +using Apache.NMS;
    +using Amqp;
    +using Amqp.Framing;
    +using NMS.AMQP.Util;
    +using NMS.AMQP.Transport;
    +using System.Reflection;
    +using Apache.NMS.Util;
    +
    +namespace NMS.AMQP
    +{
    +    using Message.Factory;
    +    enum ConnectionState
    +    {
    +        UNKNOWN = -1,
    +        INITIAL = 0,
    +        CONNECTING = 1,
    +        CONNECTED = 2,
    +        CLOSING = 3,
    +        CLOSED = 4,
    +
    +    }
    +
    +    /// <summary>
    +    /// NMS.AMQP.Connection facilitates management and creates the 
underlying Amqp.Connection protocol engine object.
    +    /// NMS.AMQP.Connection is also the NMS.AMQP.Session Factory.
    +    /// </summary>
    +    class Connection : NMSResource<ConnectionInfo>, Apache.NMS.IConnection
    +    {
    +        public static readonly string MESSAGE_OBJECT_SERIALIZATION_PROP = 
PropertyUtil.CreateProperty("Message.Serialization", 
ConnectionFactory.ConnectionPropertyPrefix);
    +        
    +        private IRedeliveryPolicy redeliveryPolicy;
    +        private Amqp.IConnection impl;
    +        private ProviderCreateConnection implCreate;
    +        private ConnectionInfo connInfo;
    +        private readonly IdGenerator clientIdGenerator;
    +        private Atomic<bool> clientIdCanSet = new Atomic<bool>(true);
    +        private Atomic<bool> closing = new Atomic<bool>(false);
    +        private Atomic<ConnectionState> state = new 
Atomic<ConnectionState>(ConnectionState.INITIAL);
    +        private CountDownLatch latch;
    +        private ConcurrentDictionary<Id, Session> sessions = new 
ConcurrentDictionary<Id, Session>();
    +        private IdGenerator sesIdGen = null;
    +        private IdGenerator tempTopicIdGen = null;
    +        private IdGenerator tempQueueIdGen = null;
    +        private StringDictionary properties;
    +        private TemporaryLinkCache temporaryLinks = null;
    +        private IProviderTransportContext transportContext = null;
    +        private DispatchExecutor exceptionExecutor = null;
    +
    +        #region Contructor
    +
    +        internal Connection(Uri addr, IdGenerator clientIdGenerator)
    +        {
    +            connInfo = new ConnectionInfo();
    +            connInfo.remoteHost = addr;
    +            Info = connInfo;
    +            this.clientIdGenerator = clientIdGenerator;
    +            latch = new CountDownLatch(1);
    +            temporaryLinks = new TemporaryLinkCache(this);
    +        }
    +        
    +        #endregion
    +
    +        #region Internal Properties
    +
    +        internal Amqp.IConnection InnerConnection { get { return 
this.impl; } }
    +
    +        internal IdGenerator SessionIdGenerator
    +        {
    +            get
    +            {
    +                IdGenerator sig = sesIdGen;
    +                lock (this)
    +                {
    +                    if (sig == null)
    +                    {
    +                        sig = new NestedIdGenerator("ID:ses", connInfo.Id, 
true);
    +                        sesIdGen = sig;
    +                    }
    +                }
    +                return sig;
    +            }
    +        }
    +
    +        internal IdGenerator TemporaryTopicGenerator
    +        {
    +            get
    +            {
    +                IdGenerator ttg = tempTopicIdGen;
    +                lock (this)
    +                {
    +                    if (ttg == null)
    +                    {
    +                        ttg = new NestedIdGenerator("ID:nms-temp-topic", 
Info.Id, true);
    +                        tempTopicIdGen = ttg;
    +                    }
    +                }
    +                return ttg;
    +            }
    +        }
    +
    +        internal IdGenerator TemporaryQueueGenerator
    +        {
    +            get
    +            {
    +                IdGenerator tqg = tempQueueIdGen;
    +                lock (this)
    +                {
    +                    if (tqg == null)
    +                    {
    +                        tqg = new NestedIdGenerator("ID:nms-temp-queue", 
Info.Id, true);
    +                        tempQueueIdGen = tqg;
    +                    }
    +                }
    +                return tqg;
    +            }
    +        }
    +
    +        internal bool IsConnected
    +        {
    +            get
    +            {
    +                return this.state.Value.Equals(ConnectionState.CONNECTED);
    +            }
    +        }
    +
    +        internal bool IsClosed
    +        {
    +            get
    +            {
    +                return this.state.Value.Equals(ConnectionState.CLOSED);
    +            }
    +        }
    +
    +        internal ushort MaxChannel
    +        {
    +            get { return connInfo.channelMax; }
    +        }
    +
    +        internal MessageTransformation TransformFactory
    +        {
    +            get
    +            {
    +                return 
MessageFactory<ConnectionInfo>.Instance(this).GetTransformFactory();
    +            }
    +        }
    +
    +        internal IMessageFactory MessageFactory
    +        {
    +            get
    +            {
    +                return MessageFactory<ConnectionInfo>.Instance(this);
    +            }
    +        }
    +
    +        internal string TopicPrefix
    +        {
    +            get { return connInfo.TopicPrefix; }
    +        }
    +
    +        internal string QueuePrefix
    +        {
    +            get { return connInfo.QueuePrefix; }
    +        }
    +
    +        internal bool IsAnonymousRelay
    +        {
    +            get { return connInfo.IsAnonymousRelay; }
    +        }
    +
    +        internal bool IsDelayedDelivery
    +        {
    +            get { return connInfo.IsDelayedDelivery; }
    +        }
    +        
    +        #endregion
    +
    +        #region Internal Methods
    +
    +        internal ITemporaryTopic CreateTemporaryTopic()
    +        {
    +            TemporaryTopic temporaryTopic = new TemporaryTopic(this);
    +
    +            CreateTemporaryLink(temporaryTopic);
    +
    +            return temporaryTopic;
    +        }
    +
    +        internal ITemporaryQueue CreateTemporaryQueue()
    +        {
    +            TemporaryQueue temporaryQueue = new TemporaryQueue(this);
    +
    +            CreateTemporaryLink(temporaryQueue);
    +
    +            return temporaryQueue;
    +        }
    +
    +        private void CreateTemporaryLink(TemporaryDestination 
temporaryDestination)
    +        {
    +            TemporaryLink link = new TemporaryLink(temporaryLinks.Session, 
temporaryDestination);
    +
    +            link.Attach();
    +
    +            temporaryLinks.AddLink(temporaryDestination, link);
    +
    +        }
    +
    +        /// <summary>
    +        /// Unsubscribes Durable Consumers on the connection
    +        /// </summary>
    +        /// <param name="name">The subscription name.</param>
    +        internal void Unsubscribe(string name)
    +        {
    +            // check for any active consumers on the subscription name.
    +            foreach (Session session in GetSessions())
    +            {
    +                if (session.ContainsSubscriptionName(name))
    +                {
    +                    throw new IllegalStateException("Cannot unsubscribe 
from Durable Consumer while consuming messages.");
    +                }
    +            }
    +            // unsubscribe using an instance of RemoveSubscriptionLink.
    +            RemoveSubscriptionLink removeLink = new 
RemoveSubscriptionLink(this.temporaryLinks.Session, name);
    +            removeLink.Unsubscribe();
    +        }
    +
    +        internal bool ContainsSubscriptionName(string name)
    +        {
    +            foreach (Session session in GetSessions())
    +            {
    +                if (session.ContainsSubscriptionName(name))
    +                {
    +                    return true;
    +                }
    +            }
    +            return false;
    +        }
    +
    +        internal void Configure(ConnectionFactory cf)
    +        {
    +            Amqp.ConnectionFactory cfImpl = cf.Factory as 
Amqp.ConnectionFactory;
    +            
    +            // get properties from connection factory
    +            StringDictionary properties = cf.ConnectionProperties;
    +
    +            // apply connection properties to connection factory and 
connection info.
    +            PropertyUtil.SetProperties(cfImpl.AMQP, properties, 
ConnectionFactory.ConnectionPropertyPrefix);
    +            PropertyUtil.SetProperties(connInfo, properties, 
ConnectionFactory.ConnectionPropertyPrefix);
    +            
    +            // create copy of transport context
    +            this.transportContext = cf.Context.Copy();
    +
    +            // Store raw properties for future objects
    +            this.properties = PropertyUtil.Clone(properties);
    +            
    +            // Create Connection builder delegate.
    +            this.implCreate = 
this.transportContext.CreateConnectionBuilder();
    +        }
    +
    +        internal StringDictionary Properties
    +        {
    +            get { return PropertyUtil.Merge(this.properties, 
PropertyUtil.GetProperties(this.connInfo)); }
    +        }
    +
    +        internal void Remove(TemporaryDestination destination)
    +        {
    +            temporaryLinks.RemoveLink(destination);
    +        }
    +
    +        internal void DestroyTemporaryDestination(TemporaryDestination 
destination)
    +        {
    +            ThrowIfClosed();
    +            foreach(Session session in GetSessions())
    +            {
    +                if (session.IsDestinationInUse(destination))
    +                {
    +                    throw new IllegalStateException("Cannot delete 
Temporary Destination, {0}, while consuming messages.");
    +                }
    +            }
    +            try
    +            {
    +                TemporaryLink link = 
temporaryLinks.RemoveLink(destination);
    +                if(link != null && !link.IsClosed)
    +                {
    +                    link.Close();
    +                }
    +            }
    +            catch (Exception e)
    +            {
    +                throw ExceptionSupport.Wrap(e);
    +            }
    +        }
    +
    +        internal void Remove(Session ses)
    +        {
    +            Session result = null;
    +            if(!sessions.TryRemove(ses.Id, out result))
    +            {
    +                Tracer.WarnFormat("Could not disassociate Session {0} with 
Connection {1}.", ses.Id, ClientId);
    +            }
    +        }
    +
    +        private Session[] GetSessions()
    +        {   
    +            return sessions.Values.ToArray();
    +        }
    +
    +        private void CheckIfClosed()
    +        {
    +            if (this.state.Value.Equals(ConnectionState.CLOSED))
    +            {
    +                throw new IllegalStateException("Operation invalid on 
closed connection.");
    +            }
    +        }
    +
    +        private void ProcessCapabilities(Open openResponse)
    +        {
    +            if(openResponse.OfferedCapabilities != null || 
openResponse.OfferedCapabilities.Length > 0)
    +            {
    +                foreach(Amqp.Types.Symbol symbol in 
openResponse.OfferedCapabilities)
    +                {
    +                    if 
(SymbolUtil.OPEN_CAPABILITY_ANONYMOUS_RELAY.Equals(symbol))
    +                    {
    +                        connInfo.IsAnonymousRelay = true;
    +                    }
    +                    else if 
(SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY.Equals(symbol))
    +                    {
    +                        connInfo.IsDelayedDelivery = true;
    +                    }
    +                    else
    +                    {
    +                        connInfo.AddCapability(symbol);
    +                    }
    +                }
    +            }
    +        }
    +
    +        private void ProcessRemoteConnectionProperties(Open openResponse)
    +        {
    +            if (openResponse.Properties != null && 
openResponse.Properties.Count > 0)
    +            {
    +                foreach(object key in openResponse.Properties.Keys)
    +                {
    +                    string keyString = key.ToString();
    +                    string valueString = 
openResponse.Properties[key]?.ToString();
    +                    this.connInfo.RemotePeerProperies.Add(keyString, 
valueString);
    +                }
    +            }
    +        }
    +
    +        private void OpenResponse(Amqp.IConnection conn, Open openResp)
    +        {
    +            Tracer.InfoFormat("Connection {0}, Open {0}", conn.ToString(), 
openResp.ToString());
    +            Tracer.DebugFormat("Open Response : \n Hostname = {0},\n 
ContainerId = {1},\n MaxChannel = {2},\n MaxFrame = {3}\n", openResp.HostName, 
openResp.ContainerId, openResp.ChannelMax, openResp.MaxFrameSize);
    +            Tracer.DebugFormat("Open Response Descriptor : \n Descriptor 
Name = {0},\n Descriptor Code = {1}\n", openResp.Descriptor.Name, 
openResp.Descriptor.Code);
    +            ProcessCapabilities(openResp);
    +            ProcessRemoteConnectionProperties(openResp);
    +            if (SymbolUtil.CheckAndCompareFields(openResp.Properties, 
SymbolUtil.CONNECTION_ESTABLISH_FAILED, SymbolUtil.BOOLEAN_TRUE))
    +            {
    +                Tracer.InfoFormat("Open response contains {0} property the 
connection {1} will soon be closed.", SymbolUtil.CONNECTION_ESTABLISH_FAILED, 
this.ClientId);
    +            }
    +            else
    +            {
    +                object value = 
SymbolUtil.GetFromFields(openResp.Properties, 
SymbolUtil.CONNECTION_PROPERTY_TOPIC_PREFIX);
    +                if(value != null && value is string)
    +                {
    +                    this.connInfo.TopicPrefix = value as string;
    +                }
    +                value = SymbolUtil.GetFromFields(openResp.Properties, 
SymbolUtil.CONNECTION_PROPERTY_QUEUE_PREFIX);
    +                if (value != null && value is string)
    +                {
    +                    this.connInfo.QueuePrefix = value as string;
    +                }
    +                this.latch?.countDown();
    +            }
    +        }
    +
    +        private Open CreateOpenFrame(ConnectionInfo connInfo)
    +        {
    +            Open frame = new Open();
    +            frame.ContainerId = connInfo.clientId;
    +            frame.ChannelMax = connInfo.channelMax;
    +            frame.MaxFrameSize = Convert.ToUInt32(connInfo.maxFrameSize);
    +            frame.HostName = connInfo.remoteHost.Host;
    +            frame.IdleTimeOut = Convert.ToUInt32(connInfo.idleTimout);
    +            frame.DesiredCapabilities = new Amqp.Types.Symbol[] {
    +                SymbolUtil.OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER,
    +                SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY
    --- End diff --
    
    If allowing anonymous producers the client should be adding the 
ANONYMOUS-RELAY capability here as well


---

Reply via email to