Github user tabish121 commented on a diff in the pull request:
https://github.com/apache/activemq-nms-amqp/pull/2#discussion_r204161653
--- Diff: src/main/csharp/Connection.cs ---
@@ -0,0 +1,1158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Collections.Specialized;
+using System.Collections.Concurrent;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Amqp;
+using Amqp.Framing;
+using NMS.AMQP.Util;
+using NMS.AMQP.Transport;
+using System.Reflection;
+using Apache.NMS.Util;
+
+namespace NMS.AMQP
+{
+ using Message.Factory;
+ enum ConnectionState
+ {
+ UNKNOWN = -1,
+ INITIAL = 0,
+ CONNECTING = 1,
+ CONNECTED = 2,
+ CLOSING = 3,
+ CLOSED = 4,
+
+ }
+
+ /// <summary>
+ /// NMS.AMQP.Connection facilitates management and creates the
underlying Amqp.Connection protocol engine object.
+ /// NMS.AMQP.Connection is also the NMS.AMQP.Session Factory.
+ /// </summary>
+ class Connection : NMSResource<ConnectionInfo>, Apache.NMS.IConnection
+ {
+ public static readonly string MESSAGE_OBJECT_SERIALIZATION_PROP =
PropertyUtil.CreateProperty("Message.Serialization",
ConnectionFactory.ConnectionPropertyPrefix);
+
+ private IRedeliveryPolicy redeliveryPolicy;
+ private Amqp.IConnection impl;
+ private ProviderCreateConnection implCreate;
+ private ConnectionInfo connInfo;
+ private readonly IdGenerator clientIdGenerator;
+ private Atomic<bool> clientIdCanSet = new Atomic<bool>(true);
+ private Atomic<bool> closing = new Atomic<bool>(false);
+ private Atomic<ConnectionState> state = new
Atomic<ConnectionState>(ConnectionState.INITIAL);
+ private CountDownLatch latch;
+ private ConcurrentDictionary<Id, Session> sessions = new
ConcurrentDictionary<Id, Session>();
+ private IdGenerator sesIdGen = null;
+ private IdGenerator tempTopicIdGen = null;
+ private IdGenerator tempQueueIdGen = null;
+ private StringDictionary properties;
+ private TemporaryLinkCache temporaryLinks = null;
+ private IProviderTransportContext transportContext = null;
+ private DispatchExecutor exceptionExecutor = null;
+
+ #region Contructor
+
+ internal Connection(Uri addr, IdGenerator clientIdGenerator)
+ {
+ connInfo = new ConnectionInfo();
+ connInfo.remoteHost = addr;
+ Info = connInfo;
+ this.clientIdGenerator = clientIdGenerator;
+ latch = new CountDownLatch(1);
+ temporaryLinks = new TemporaryLinkCache(this);
+ }
+
+ #endregion
+
+ #region Internal Properties
+
+ internal Amqp.IConnection InnerConnection { get { return
this.impl; } }
+
+ internal IdGenerator SessionIdGenerator
+ {
+ get
+ {
+ IdGenerator sig = sesIdGen;
+ lock (this)
+ {
+ if (sig == null)
+ {
+ sig = new NestedIdGenerator("ID:ses", connInfo.Id,
true);
+ sesIdGen = sig;
+ }
+ }
+ return sig;
+ }
+ }
+
+ internal IdGenerator TemporaryTopicGenerator
+ {
+ get
+ {
+ IdGenerator ttg = tempTopicIdGen;
+ lock (this)
+ {
+ if (ttg == null)
+ {
+ ttg = new NestedIdGenerator("ID:nms-temp-topic",
Info.Id, true);
+ tempTopicIdGen = ttg;
+ }
+ }
+ return ttg;
+ }
+ }
+
+ internal IdGenerator TemporaryQueueGenerator
+ {
+ get
+ {
+ IdGenerator tqg = tempQueueIdGen;
+ lock (this)
+ {
+ if (tqg == null)
+ {
+ tqg = new NestedIdGenerator("ID:nms-temp-queue",
Info.Id, true);
+ tempQueueIdGen = tqg;
+ }
+ }
+ return tqg;
+ }
+ }
+
+ internal bool IsConnected
+ {
+ get
+ {
+ return this.state.Value.Equals(ConnectionState.CONNECTED);
+ }
+ }
+
+ internal bool IsClosed
+ {
+ get
+ {
+ return this.state.Value.Equals(ConnectionState.CLOSED);
+ }
+ }
+
+ internal ushort MaxChannel
+ {
+ get { return connInfo.channelMax; }
+ }
+
+ internal MessageTransformation TransformFactory
+ {
+ get
+ {
+ return
MessageFactory<ConnectionInfo>.Instance(this).GetTransformFactory();
+ }
+ }
+
+ internal IMessageFactory MessageFactory
+ {
+ get
+ {
+ return MessageFactory<ConnectionInfo>.Instance(this);
+ }
+ }
+
+ internal string TopicPrefix
+ {
+ get { return connInfo.TopicPrefix; }
+ }
+
+ internal string QueuePrefix
+ {
+ get { return connInfo.QueuePrefix; }
+ }
+
+ internal bool IsAnonymousRelay
+ {
+ get { return connInfo.IsAnonymousRelay; }
+ }
+
+ internal bool IsDelayedDelivery
+ {
+ get { return connInfo.IsDelayedDelivery; }
+ }
+
+ #endregion
+
+ #region Internal Methods
+
+ internal ITemporaryTopic CreateTemporaryTopic()
+ {
+ TemporaryTopic temporaryTopic = new TemporaryTopic(this);
+
+ CreateTemporaryLink(temporaryTopic);
+
+ return temporaryTopic;
+ }
+
+ internal ITemporaryQueue CreateTemporaryQueue()
+ {
+ TemporaryQueue temporaryQueue = new TemporaryQueue(this);
+
+ CreateTemporaryLink(temporaryQueue);
+
+ return temporaryQueue;
+ }
+
+ private void CreateTemporaryLink(TemporaryDestination
temporaryDestination)
+ {
+ TemporaryLink link = new TemporaryLink(temporaryLinks.Session,
temporaryDestination);
+
+ link.Attach();
+
+ temporaryLinks.AddLink(temporaryDestination, link);
+
+ }
+
+ /// <summary>
+ /// Unsubscribes Durable Consumers on the connection
+ /// </summary>
+ /// <param name="name">The subscription name.</param>
+ internal void Unsubscribe(string name)
+ {
+ // check for any active consumers on the subscription name.
+ foreach (Session session in GetSessions())
+ {
+ if (session.ContainsSubscriptionName(name))
+ {
+ throw new IllegalStateException("Cannot unsubscribe
from Durable Consumer while consuming messages.");
+ }
+ }
+ // unsubscribe using an instance of RemoveSubscriptionLink.
+ RemoveSubscriptionLink removeLink = new
RemoveSubscriptionLink(this.temporaryLinks.Session, name);
+ removeLink.Unsubscribe();
+ }
+
+ internal bool ContainsSubscriptionName(string name)
+ {
+ foreach (Session session in GetSessions())
+ {
+ if (session.ContainsSubscriptionName(name))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ internal void Configure(ConnectionFactory cf)
+ {
+ Amqp.ConnectionFactory cfImpl = cf.Factory as
Amqp.ConnectionFactory;
+
+ // get properties from connection factory
+ StringDictionary properties = cf.ConnectionProperties;
+
+ // apply connection properties to connection factory and
connection info.
+ PropertyUtil.SetProperties(cfImpl.AMQP, properties,
ConnectionFactory.ConnectionPropertyPrefix);
+ PropertyUtil.SetProperties(connInfo, properties,
ConnectionFactory.ConnectionPropertyPrefix);
+
+ // create copy of transport context
+ this.transportContext = cf.Context.Copy();
+
+ // Store raw properties for future objects
+ this.properties = PropertyUtil.Clone(properties);
+
+ // Create Connection builder delegate.
+ this.implCreate =
this.transportContext.CreateConnectionBuilder();
+ }
+
+ internal StringDictionary Properties
+ {
+ get { return PropertyUtil.Merge(this.properties,
PropertyUtil.GetProperties(this.connInfo)); }
+ }
+
+ internal void Remove(TemporaryDestination destination)
+ {
+ temporaryLinks.RemoveLink(destination);
+ }
+
+ internal void DestroyTemporaryDestination(TemporaryDestination
destination)
+ {
+ ThrowIfClosed();
+ foreach(Session session in GetSessions())
+ {
+ if (session.IsDestinationInUse(destination))
+ {
+ throw new IllegalStateException("Cannot delete
Temporary Destination, {0}, while consuming messages.");
+ }
+ }
+ try
+ {
+ TemporaryLink link =
temporaryLinks.RemoveLink(destination);
+ if(link != null && !link.IsClosed)
+ {
+ link.Close();
+ }
+ }
+ catch (Exception e)
+ {
+ throw ExceptionSupport.Wrap(e);
+ }
+ }
+
+ internal void Remove(Session ses)
+ {
+ Session result = null;
+ if(!sessions.TryRemove(ses.Id, out result))
+ {
+ Tracer.WarnFormat("Could not disassociate Session {0} with
Connection {1}.", ses.Id, ClientId);
+ }
+ }
+
+ private Session[] GetSessions()
+ {
+ return sessions.Values.ToArray();
+ }
+
+ private void CheckIfClosed()
+ {
+ if (this.state.Value.Equals(ConnectionState.CLOSED))
+ {
+ throw new IllegalStateException("Operation invalid on
closed connection.");
+ }
+ }
+
+ private void ProcessCapabilities(Open openResponse)
+ {
+ if(openResponse.OfferedCapabilities != null ||
openResponse.OfferedCapabilities.Length > 0)
+ {
+ foreach(Amqp.Types.Symbol symbol in
openResponse.OfferedCapabilities)
+ {
+ if
(SymbolUtil.OPEN_CAPABILITY_ANONYMOUS_RELAY.Equals(symbol))
+ {
+ connInfo.IsAnonymousRelay = true;
+ }
+ else if
(SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY.Equals(symbol))
+ {
+ connInfo.IsDelayedDelivery = true;
+ }
+ else
+ {
+ connInfo.AddCapability(symbol);
+ }
+ }
+ }
+ }
+
+ private void ProcessRemoteConnectionProperties(Open openResponse)
+ {
+ if (openResponse.Properties != null &&
openResponse.Properties.Count > 0)
+ {
+ foreach(object key in openResponse.Properties.Keys)
+ {
+ string keyString = key.ToString();
+ string valueString =
openResponse.Properties[key]?.ToString();
+ this.connInfo.RemotePeerProperies.Add(keyString,
valueString);
+ }
+ }
+ }
+
+ private void OpenResponse(Amqp.IConnection conn, Open openResp)
+ {
+ Tracer.InfoFormat("Connection {0}, Open {0}", conn.ToString(),
openResp.ToString());
+ Tracer.DebugFormat("Open Response : \n Hostname = {0},\n
ContainerId = {1},\n MaxChannel = {2},\n MaxFrame = {3}\n", openResp.HostName,
openResp.ContainerId, openResp.ChannelMax, openResp.MaxFrameSize);
+ Tracer.DebugFormat("Open Response Descriptor : \n Descriptor
Name = {0},\n Descriptor Code = {1}\n", openResp.Descriptor.Name,
openResp.Descriptor.Code);
+ ProcessCapabilities(openResp);
+ ProcessRemoteConnectionProperties(openResp);
+ if (SymbolUtil.CheckAndCompareFields(openResp.Properties,
SymbolUtil.CONNECTION_ESTABLISH_FAILED, SymbolUtil.BOOLEAN_TRUE))
+ {
+ Tracer.InfoFormat("Open response contains {0} property the
connection {1} will soon be closed.", SymbolUtil.CONNECTION_ESTABLISH_FAILED,
this.ClientId);
+ }
+ else
+ {
+ object value =
SymbolUtil.GetFromFields(openResp.Properties,
SymbolUtil.CONNECTION_PROPERTY_TOPIC_PREFIX);
+ if(value != null && value is string)
+ {
+ this.connInfo.TopicPrefix = value as string;
+ }
+ value = SymbolUtil.GetFromFields(openResp.Properties,
SymbolUtil.CONNECTION_PROPERTY_QUEUE_PREFIX);
+ if (value != null && value is string)
+ {
+ this.connInfo.QueuePrefix = value as string;
+ }
+ this.latch?.countDown();
+ }
+ }
+
+ private Open CreateOpenFrame(ConnectionInfo connInfo)
+ {
+ Open frame = new Open();
+ frame.ContainerId = connInfo.clientId;
+ frame.ChannelMax = connInfo.channelMax;
+ frame.MaxFrameSize = Convert.ToUInt32(connInfo.maxFrameSize);
+ frame.HostName = connInfo.remoteHost.Host;
+ frame.IdleTimeOut = Convert.ToUInt32(connInfo.idleTimout);
+ frame.DesiredCapabilities = new Amqp.Types.Symbol[] {
+ SymbolUtil.OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER,
+ SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY
--- End diff --
If allowing anonymous producers the client should be adding the
ANONYMOUS-RELAY capability here as well
---