HavretGC commented on a change in pull request #4: [WIP] Failover implementation
URL: https://github.com/apache/activemq-nms-amqp/pull/4#discussion_r302456586
 
 

 ##########
 File path: src/NMS.AMQP/Provider/Failover/FailoverProvider.cs
 ##########
 @@ -0,0 +1,667 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Threading.Tasks;
+using Apache.NMS.AMQP.Message;
+using Apache.NMS.AMQP.Meta;
+using Apache.NMS.AMQP.Util;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.AMQP.Provider.Failover
+{
+    public class FailoverProvider : IProvider, IProviderListener
+    {
+        private readonly object SyncRoot = new object();
+
+        private static int UNDEFINED = -1;
+
+        public static int DEFAULT_INITIAL_RECONNECT_DELAY = 0;
+        public static long DEFAULT_RECONNECT_DELAY = 10;
+        public static double DEFAULT_RECONNECT_BACKOFF_MULTIPLIER = 2.0d;
+        public static long DEFAULT_MAX_RECONNECT_DELAY = 
(long)Math.Round(TimeSpan.FromSeconds(30).TotalMilliseconds);
+        public static int DEFAULT_STARTUP_MAX_RECONNECT_ATTEMPTS = UNDEFINED;
+        public static int DEFAULT_MAX_RECONNECT_ATTEMPTS = UNDEFINED;
+        public static bool DEFAULT_USE_RECONNECT_BACKOFF = true;
+        public static int DEFAULT_WARN_AFTER_RECONNECT_ATTEMPTS = 10;
+        public static bool DEFAULT_RANDOMIZE_ENABLED = false;
+        
+        private readonly ReconnectControls reconnectControl;
+        private readonly FailoverUriPool uris;
+
+        private AtomicBool closed = new AtomicBool();
+        private Atomic<bool> failed = new Atomic<bool>();
+
+        private long requestTimeout;
+        private long sendTimeout;
+
+        private Uri connectedUri;
+        private ConnectionInfo connectionInfo;
+        private IProvider provider;
+        private IProviderListener listener;
+
+        private List<FailoverRequest> requests = new List<FailoverRequest>();
+
+        internal IProvider ActiveProvider => provider;
+
+        public FailoverProvider(IEnumerable<Uri> uris)
+        {
+            this.uris = new FailoverUriPool(uris);
+            reconnectControl = new ReconnectControls(this);
+        }
+
+        public long InitialReconnectDelay { get; set; } = 
DEFAULT_INITIAL_RECONNECT_DELAY;
+        public long ReconnectDelay { get; set; } = DEFAULT_RECONNECT_DELAY;
+        public bool UseReconnectBackOff { get; set; } = 
DEFAULT_USE_RECONNECT_BACKOFF;
+        public double ReconnectBackOffMultiplier { get; set; } = 
DEFAULT_RECONNECT_BACKOFF_MULTIPLIER;
+        public long MaxReconnectDelay { get; set; } = 
DEFAULT_MAX_RECONNECT_DELAY;
+        public int StartupMaxReconnectAttempts { get; set; } = 
DEFAULT_STARTUP_MAX_RECONNECT_ATTEMPTS;
+        public int MaxReconnectAttempts { get; set; } = 
DEFAULT_MAX_RECONNECT_ATTEMPTS;
+        public int WarnAfterReconnectAttempts { get; set; } = 
DEFAULT_WARN_AFTER_RECONNECT_ATTEMPTS;
+        public Uri RemoteUri => connectedUri;
+
+        public void Start()
+        {
+            CheckClosed();
+
+            if (listener == null)
+            {
+                throw new IllegalStateException("No ProviderListener 
registered.");
+            }
+        }
+
+        public Task Connect(ConnectionInfo info)
+        {
+            CheckClosed();
+
+            requestTimeout = info.requestTimeout;
+            sendTimeout = info.SendTimeout;
+
+            connectionInfo = info;
+            Tracer.Debug("Initiating initial connection attempt task");
+            return TriggerReconnectionAttempt();
+        }
+
+        private Task TriggerReconnectionAttempt()
+        {
+            if (closed)
+                return Task.CompletedTask;
+
+            return reconnectControl.ScheduleReconnect(Reconnect);
+
+            async Task Reconnect()
+            {
+                IProvider provider = null;
+                Exception failure = null;
+                long reconnectAttempts = reconnectControl.RecordNextAttempt();
+
+                try
+                {
+                    if (uris.Any())
+                    {
+                        for (int i = 0; i < uris.Size(); i++)
+                        {
+                            var target = uris.GetNext();
+                            if (target == null)
+                            {
+                                Tracer.Debug("Failover URI collection 
unexpectedly modified during connection attempt.");
+                                continue;
+                            }
+
+                            try
+                            {
+                                Tracer.Debug($"Connection 
attempt:[{reconnectAttempts}] to: {target.Scheme}://{target.Host}:{target.Port} 
in-progress");
+                                provider = ProviderFactory.Create(target);
+                                await provider.Connect(connectionInfo);
+                                await InitializeNewConnection(provider);
+                                return;
+                            }
+                            catch (Exception e)
+                            {
+                                Tracer.Info($"Connection 
attempt:[{reconnectAttempts}] to: {target.Scheme}://{target.Host}:{target.Port} 
failed");
+                                failure = e;
+                                try
+                                {
+                                    provider?.Close();
+                                }
+                                catch { }
+                                finally
+                                {
+                                    provider = null;
+                                }
+                            }
+                        }
+                    }
+                    else
+                    {
+                        Tracer.Debug("No remote URI available to connect to in 
failover list");
+                        // TODO Handle this one.
+                        failure = new IOException("No remote URI available for 
reconnection during connection attempt: " + reconnectAttempts);
+                    }
+                }
+                catch (Exception unknownFailure)
+                {
+                    Tracer.Warn($"Connection attempt:[{reconnectAttempts}] 
failed abnormally.");
+                    failure = failure ?? unknownFailure;
+                }
+                finally
+                {
+                    if (provider == null)
+                    {
+                        Tracer.Debug($"Connection 
attempt:[{reconnectControl.ReconnectAttempts}] failed error: 
{failure.Message}");
+                        if (!reconnectControl.IsReconnectAllowed(failure))
+                        {
+                            ReportReconnectFailure(failure);
+                        }
+                        else
+                        {
+                            await 
reconnectControl.ScheduleReconnect(Reconnect);
+                        }
+                    }
+                }
+            }
+        }
+
+        private void ReportReconnectFailure(Exception lastFailure)
+        {
+            Tracer.Error($"Failed to connect after: 
{reconnectControl.ReconnectAttempts} attempt(s)");
+            if (failed.CompareAndSet(false, true))
+            {
+                if (lastFailure == null)
+                {
+                    lastFailure = new IOException($"Failed to connect after: 
{reconnectControl.ReconnectAttempts} attempt(s)");
+                }
+
+                var exception = NMSExceptionSupport.Create(lastFailure);
+                listener.OnConnectionFailure(exception);
+                throw exception;
+            }
+        }
+
+        private async Task InitializeNewConnection(IProvider provider)
+        {
+            if (closed)
+            {
+                try
+                {
+                    provider.Close();
+                }
+                catch (Exception e)
+                {
+                    Tracer.Debug($"Ignoring failure to close failed provider: 
{provider} {e.Message}");
+                }
+                return;
+            }
+
+            this.provider = provider;
+            this.provider.SetProviderListener(this);
+            this.connectedUri = provider.RemoteUri;
+
+            if (reconnectControl.IsRecoveryRequired())
+            {
+                Tracer.Debug($"Signalling connection recovery: {provider}");
+
+                // Allow listener to recover its resources
+                await listener.OnConnectionRecovery(provider);
+
+                // Restart consumers, send pull commands, etc.
+                await listener.OnConnectionRecovered(provider);
+
+                // Let the client know that connection has restored.
+                listener.OnConnectionRestored(connectedUri);
+
+                // If we try to run pending requests right after the 
connection is reestablished 
+                // it will result in timeout on the first send request
+                await Task.Delay(50);
+
+                foreach (FailoverRequest request in GetPendingRequests())
+                {
+                    await request.Run();
+                }
+
+                reconnectControl.ConnectionEstablished();
+            }
+            else
+            {
+                listener.OnConnectionEstablished(connectedUri);
+                reconnectControl.ConnectionEstablished();
+            }
+        }
+
+        public override string ToString()
+        {
+            return "FailoverProvider: " + (connectedUri == null ? 
"unconnected" : connectedUri.ToString());
+        }
+
+        public void Close()
+        {
+            if (closed.CompareAndSet(false, true))
+            {
+                try
+                {
+                    provider.Close();
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to