Github user tabish121 commented on a diff in the pull request: https://github.com/apache/activemq-nms-amqp/pull/2#discussion_r204161653 --- Diff: src/main/csharp/Connection.cs --- @@ -0,0 +1,1158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Collections.Specialized; +using System.Collections.Concurrent; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Apache.NMS; +using Amqp; +using Amqp.Framing; +using NMS.AMQP.Util; +using NMS.AMQP.Transport; +using System.Reflection; +using Apache.NMS.Util; + +namespace NMS.AMQP +{ + using Message.Factory; + enum ConnectionState + { + UNKNOWN = -1, + INITIAL = 0, + CONNECTING = 1, + CONNECTED = 2, + CLOSING = 3, + CLOSED = 4, + + } + + /// <summary> + /// NMS.AMQP.Connection facilitates management and creates the underlying Amqp.Connection protocol engine object. + /// NMS.AMQP.Connection is also the NMS.AMQP.Session Factory. + /// </summary> + class Connection : NMSResource<ConnectionInfo>, Apache.NMS.IConnection + { + public static readonly string MESSAGE_OBJECT_SERIALIZATION_PROP = PropertyUtil.CreateProperty("Message.Serialization", ConnectionFactory.ConnectionPropertyPrefix); + + private IRedeliveryPolicy redeliveryPolicy; + private Amqp.IConnection impl; + private ProviderCreateConnection implCreate; + private ConnectionInfo connInfo; + private readonly IdGenerator clientIdGenerator; + private Atomic<bool> clientIdCanSet = new Atomic<bool>(true); + private Atomic<bool> closing = new Atomic<bool>(false); + private Atomic<ConnectionState> state = new Atomic<ConnectionState>(ConnectionState.INITIAL); + private CountDownLatch latch; + private ConcurrentDictionary<Id, Session> sessions = new ConcurrentDictionary<Id, Session>(); + private IdGenerator sesIdGen = null; + private IdGenerator tempTopicIdGen = null; + private IdGenerator tempQueueIdGen = null; + private StringDictionary properties; + private TemporaryLinkCache temporaryLinks = null; + private IProviderTransportContext transportContext = null; + private DispatchExecutor exceptionExecutor = null; + + #region Contructor + + internal Connection(Uri addr, IdGenerator clientIdGenerator) + { + connInfo = new ConnectionInfo(); + connInfo.remoteHost = addr; + Info = connInfo; + this.clientIdGenerator = clientIdGenerator; + latch = new CountDownLatch(1); + temporaryLinks = new TemporaryLinkCache(this); + } + + #endregion + + #region Internal Properties + + internal Amqp.IConnection InnerConnection { get { return this.impl; } } + + internal IdGenerator SessionIdGenerator + { + get + { + IdGenerator sig = sesIdGen; + lock (this) + { + if (sig == null) + { + sig = new NestedIdGenerator("ID:ses", connInfo.Id, true); + sesIdGen = sig; + } + } + return sig; + } + } + + internal IdGenerator TemporaryTopicGenerator + { + get + { + IdGenerator ttg = tempTopicIdGen; + lock (this) + { + if (ttg == null) + { + ttg = new NestedIdGenerator("ID:nms-temp-topic", Info.Id, true); + tempTopicIdGen = ttg; + } + } + return ttg; + } + } + + internal IdGenerator TemporaryQueueGenerator + { + get + { + IdGenerator tqg = tempQueueIdGen; + lock (this) + { + if (tqg == null) + { + tqg = new NestedIdGenerator("ID:nms-temp-queue", Info.Id, true); + tempQueueIdGen = tqg; + } + } + return tqg; + } + } + + internal bool IsConnected + { + get + { + return this.state.Value.Equals(ConnectionState.CONNECTED); + } + } + + internal bool IsClosed + { + get + { + return this.state.Value.Equals(ConnectionState.CLOSED); + } + } + + internal ushort MaxChannel + { + get { return connInfo.channelMax; } + } + + internal MessageTransformation TransformFactory + { + get + { + return MessageFactory<ConnectionInfo>.Instance(this).GetTransformFactory(); + } + } + + internal IMessageFactory MessageFactory + { + get + { + return MessageFactory<ConnectionInfo>.Instance(this); + } + } + + internal string TopicPrefix + { + get { return connInfo.TopicPrefix; } + } + + internal string QueuePrefix + { + get { return connInfo.QueuePrefix; } + } + + internal bool IsAnonymousRelay + { + get { return connInfo.IsAnonymousRelay; } + } + + internal bool IsDelayedDelivery + { + get { return connInfo.IsDelayedDelivery; } + } + + #endregion + + #region Internal Methods + + internal ITemporaryTopic CreateTemporaryTopic() + { + TemporaryTopic temporaryTopic = new TemporaryTopic(this); + + CreateTemporaryLink(temporaryTopic); + + return temporaryTopic; + } + + internal ITemporaryQueue CreateTemporaryQueue() + { + TemporaryQueue temporaryQueue = new TemporaryQueue(this); + + CreateTemporaryLink(temporaryQueue); + + return temporaryQueue; + } + + private void CreateTemporaryLink(TemporaryDestination temporaryDestination) + { + TemporaryLink link = new TemporaryLink(temporaryLinks.Session, temporaryDestination); + + link.Attach(); + + temporaryLinks.AddLink(temporaryDestination, link); + + } + + /// <summary> + /// Unsubscribes Durable Consumers on the connection + /// </summary> + /// <param name="name">The subscription name.</param> + internal void Unsubscribe(string name) + { + // check for any active consumers on the subscription name. + foreach (Session session in GetSessions()) + { + if (session.ContainsSubscriptionName(name)) + { + throw new IllegalStateException("Cannot unsubscribe from Durable Consumer while consuming messages."); + } + } + // unsubscribe using an instance of RemoveSubscriptionLink. + RemoveSubscriptionLink removeLink = new RemoveSubscriptionLink(this.temporaryLinks.Session, name); + removeLink.Unsubscribe(); + } + + internal bool ContainsSubscriptionName(string name) + { + foreach (Session session in GetSessions()) + { + if (session.ContainsSubscriptionName(name)) + { + return true; + } + } + return false; + } + + internal void Configure(ConnectionFactory cf) + { + Amqp.ConnectionFactory cfImpl = cf.Factory as Amqp.ConnectionFactory; + + // get properties from connection factory + StringDictionary properties = cf.ConnectionProperties; + + // apply connection properties to connection factory and connection info. + PropertyUtil.SetProperties(cfImpl.AMQP, properties, ConnectionFactory.ConnectionPropertyPrefix); + PropertyUtil.SetProperties(connInfo, properties, ConnectionFactory.ConnectionPropertyPrefix); + + // create copy of transport context + this.transportContext = cf.Context.Copy(); + + // Store raw properties for future objects + this.properties = PropertyUtil.Clone(properties); + + // Create Connection builder delegate. + this.implCreate = this.transportContext.CreateConnectionBuilder(); + } + + internal StringDictionary Properties + { + get { return PropertyUtil.Merge(this.properties, PropertyUtil.GetProperties(this.connInfo)); } + } + + internal void Remove(TemporaryDestination destination) + { + temporaryLinks.RemoveLink(destination); + } + + internal void DestroyTemporaryDestination(TemporaryDestination destination) + { + ThrowIfClosed(); + foreach(Session session in GetSessions()) + { + if (session.IsDestinationInUse(destination)) + { + throw new IllegalStateException("Cannot delete Temporary Destination, {0}, while consuming messages."); + } + } + try + { + TemporaryLink link = temporaryLinks.RemoveLink(destination); + if(link != null && !link.IsClosed) + { + link.Close(); + } + } + catch (Exception e) + { + throw ExceptionSupport.Wrap(e); + } + } + + internal void Remove(Session ses) + { + Session result = null; + if(!sessions.TryRemove(ses.Id, out result)) + { + Tracer.WarnFormat("Could not disassociate Session {0} with Connection {1}.", ses.Id, ClientId); + } + } + + private Session[] GetSessions() + { + return sessions.Values.ToArray(); + } + + private void CheckIfClosed() + { + if (this.state.Value.Equals(ConnectionState.CLOSED)) + { + throw new IllegalStateException("Operation invalid on closed connection."); + } + } + + private void ProcessCapabilities(Open openResponse) + { + if(openResponse.OfferedCapabilities != null || openResponse.OfferedCapabilities.Length > 0) + { + foreach(Amqp.Types.Symbol symbol in openResponse.OfferedCapabilities) + { + if (SymbolUtil.OPEN_CAPABILITY_ANONYMOUS_RELAY.Equals(symbol)) + { + connInfo.IsAnonymousRelay = true; + } + else if (SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY.Equals(symbol)) + { + connInfo.IsDelayedDelivery = true; + } + else + { + connInfo.AddCapability(symbol); + } + } + } + } + + private void ProcessRemoteConnectionProperties(Open openResponse) + { + if (openResponse.Properties != null && openResponse.Properties.Count > 0) + { + foreach(object key in openResponse.Properties.Keys) + { + string keyString = key.ToString(); + string valueString = openResponse.Properties[key]?.ToString(); + this.connInfo.RemotePeerProperies.Add(keyString, valueString); + } + } + } + + private void OpenResponse(Amqp.IConnection conn, Open openResp) + { + Tracer.InfoFormat("Connection {0}, Open {0}", conn.ToString(), openResp.ToString()); + Tracer.DebugFormat("Open Response : \n Hostname = {0},\n ContainerId = {1},\n MaxChannel = {2},\n MaxFrame = {3}\n", openResp.HostName, openResp.ContainerId, openResp.ChannelMax, openResp.MaxFrameSize); + Tracer.DebugFormat("Open Response Descriptor : \n Descriptor Name = {0},\n Descriptor Code = {1}\n", openResp.Descriptor.Name, openResp.Descriptor.Code); + ProcessCapabilities(openResp); + ProcessRemoteConnectionProperties(openResp); + if (SymbolUtil.CheckAndCompareFields(openResp.Properties, SymbolUtil.CONNECTION_ESTABLISH_FAILED, SymbolUtil.BOOLEAN_TRUE)) + { + Tracer.InfoFormat("Open response contains {0} property the connection {1} will soon be closed.", SymbolUtil.CONNECTION_ESTABLISH_FAILED, this.ClientId); + } + else + { + object value = SymbolUtil.GetFromFields(openResp.Properties, SymbolUtil.CONNECTION_PROPERTY_TOPIC_PREFIX); + if(value != null && value is string) + { + this.connInfo.TopicPrefix = value as string; + } + value = SymbolUtil.GetFromFields(openResp.Properties, SymbolUtil.CONNECTION_PROPERTY_QUEUE_PREFIX); + if (value != null && value is string) + { + this.connInfo.QueuePrefix = value as string; + } + this.latch?.countDown(); + } + } + + private Open CreateOpenFrame(ConnectionInfo connInfo) + { + Open frame = new Open(); + frame.ContainerId = connInfo.clientId; + frame.ChannelMax = connInfo.channelMax; + frame.MaxFrameSize = Convert.ToUInt32(connInfo.maxFrameSize); + frame.HostName = connInfo.remoteHost.Host; + frame.IdleTimeOut = Convert.ToUInt32(connInfo.idleTimout); + frame.DesiredCapabilities = new Amqp.Types.Symbol[] { + SymbolUtil.OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER, + SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY --- End diff -- If allowing anonymous producers the client should be adding the ANONYMOUS-RELAY capability here as well
---