This is an automated email from the ASF dual-hosted git repository.

smolnar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git


The following commit(s) were added to refs/heads/master by this push:
     new 416943fa6 KNOX-3146: Failover feature for SSE HA (#1049)
416943fa6 is described below

commit 416943fa62240921bee256e06070181b866d11e7
Author: hanicz <[email protected]>
AuthorDate: Tue Aug 19 16:12:30 2025 +0200

    KNOX-3146: Failover feature for SSE HA (#1049)
---
 gateway-provider-ha/pom.xml                        |   9 +
 .../gateway/ha/config/CommonHaConfigurations.java  | 127 ++++++
 .../knox/gateway/ha/config/HaConfigurations.java   |  61 +++
 .../{LBHaDispatch.java => CommonHaDispatch.java}   | 153 +++++--
 .../ha/dispatch/ConfigurableHADispatch.java        | 202 ++-------
 .../knox/gateway/ha/dispatch/SSEHaCallback.java    |  63 +++
 .../knox/gateway/ha/dispatch/SSEHaDispatch.java    | 123 +++---
 .../StickySessionCookieRemovedRequest.java         |  51 +++
 .../ha/dispatch/i18n/HaDispatchMessages.java       |   5 +
 .../gateway/ha/dispatch/SSEHaDispatchTest.java     | 451 +++++++++++++++++++--
 .../hdfs/dispatch/AbstractHdfsHaDispatch.java      |  12 +-
 gateway-spi/pom.xml                                |  10 -
 .../apache/knox/gateway/SpiGatewayMessages.java    |   3 +
 .../knox/gateway/dispatch/AsyncDispatch.java       |  21 +
 .../gateway/dispatch/ConfigurableDispatch.java     |   2 +-
 .../apache/knox/gateway/dispatch/SyncDispatch.java |  21 +
 .../org/apache/knox/gateway/sse/SSECallback.java   |  80 ++++
 .../org/apache/knox/gateway/sse/SSEDispatch.java   |  53 +--
 .../apache/knox/gateway/sse/SSEDispatchTest.java   |   2 +
 pom.xml                                            |  10 +
 20 files changed, 1093 insertions(+), 366 deletions(-)

diff --git a/gateway-provider-ha/pom.xml b/gateway-provider-ha/pom.xml
index ebcb12739..e49ac5307 100644
--- a/gateway-provider-ha/pom.xml
+++ b/gateway-provider-ha/pom.xml
@@ -110,6 +110,15 @@
             <artifactId>commons-lang3</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpasyncclient</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpcore-nio</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.curator</groupId>
             <artifactId>curator-test</artifactId>
diff --git 
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/config/CommonHaConfigurations.java
 
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/config/CommonHaConfigurations.java
new file mode 100644
index 000000000..0a367c9d8
--- /dev/null
+++ 
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/config/CommonHaConfigurations.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.ha.config;
+
+import org.apache.knox.gateway.ha.provider.HaProvider;
+import org.apache.knox.gateway.ha.provider.impl.HaServiceConfigConstants;
+
+import java.util.Collections;
+import java.util.List;
+
+public class CommonHaConfigurations implements HaConfigurations {
+
+    private HaProvider haProvider;
+    private boolean loadBalancingEnabled = 
HaServiceConfigConstants.DEFAULT_LOAD_BALANCING_ENABLED;
+    private boolean stickySessionsEnabled = 
HaServiceConfigConstants.DEFAULT_STICKY_SESSIONS_ENABLED;
+    private String stickySessionCookieName = 
HaServiceConfigConstants.DEFAULT_STICKY_SESSION_COOKIE_NAME;
+    private List<String> disableLoadBalancingForUserAgents = 
Collections.singletonList(HaServiceConfigConstants.DEFAULT_DISABLE_LB_USER_AGENTS);
+    private int maxFailoverAttempts = 
HaServiceConfigConstants.DEFAULT_MAX_FAILOVER_ATTEMPTS;
+    private int failoverSleep = 
HaServiceConfigConstants.DEFAULT_FAILOVER_SLEEP;
+    private boolean failoverNonIdempotentRequestEnabled = 
HaServiceConfigConstants.DEFAULT_FAILOVER_NON_IDEMPOTENT;
+    private boolean noFallbackEnabled = 
HaServiceConfigConstants.DEFAULT_NO_FALLBACK_ENABLED;
+
+    @Override
+    public HaProvider getHaProvider() {
+        return haProvider;
+    }
+
+    @Override
+    public void setLoadBalancingEnabled(boolean enabled) {
+        this.loadBalancingEnabled = enabled;
+    }
+
+    @Override
+    public void setHaProvider(HaProvider haProvider) {
+        this.haProvider = haProvider;
+    }
+
+    @Override
+    public boolean isStickySessionEnabled() {
+        return stickySessionsEnabled;
+    }
+
+    @Override
+    public void setStickySessionsEnabled(boolean enabled) {
+        this.stickySessionsEnabled = enabled;
+    }
+
+    @Override
+    public String getStickySessionCookieName() {
+        return stickySessionCookieName;
+    }
+
+    @Override
+    public void setStickySessionCookieName(String stickySessionCookieName) {
+        this.stickySessionCookieName = stickySessionCookieName;
+    }
+
+    @Override
+    public boolean isLoadBalancingEnabled() {
+        return loadBalancingEnabled;
+    }
+
+    @Override
+    public List<String> getDisableLoadBalancingForUserAgents() {
+        return disableLoadBalancingForUserAgents;
+    }
+
+    @Override
+    public void setDisableLoadBalancingForUserAgents(List<String> 
disableLoadBalancingForUserAgents) {
+        this.disableLoadBalancingForUserAgents = 
disableLoadBalancingForUserAgents;
+    }
+
+    @Override
+    public int getMaxFailoverAttempts() {
+        return maxFailoverAttempts;
+    }
+
+    @Override
+    public void setMaxFailoverAttempts(int maxFailoverAttempts) {
+        this.maxFailoverAttempts = maxFailoverAttempts;
+    }
+
+    @Override
+    public int getFailoverSleep() {
+        return failoverSleep;
+    }
+
+    @Override
+    public void setFailoverSleep(int failoverSleep) {
+        this.failoverSleep = failoverSleep;
+    }
+
+    @Override
+    public void setFailoverNonIdempotentRequestEnabled(boolean enabled) {
+        this.failoverNonIdempotentRequestEnabled = enabled;
+    }
+
+    @Override
+    public boolean isFailoverNonIdempotentRequestEnabled() {
+        return failoverNonIdempotentRequestEnabled;
+    }
+
+    @Override
+    public void setNoFallbackEnabled(boolean enabled) {
+        this.noFallbackEnabled = enabled;
+    }
+
+    @Override
+    public boolean isNoFallbackEnabled() {
+        return noFallbackEnabled;
+    }
+}
diff --git 
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/config/HaConfigurations.java
 
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/config/HaConfigurations.java
new file mode 100644
index 000000000..2e5cadcd9
--- /dev/null
+++ 
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/config/HaConfigurations.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.ha.config;
+
+import org.apache.knox.gateway.ha.provider.HaProvider;
+
+import java.util.List;
+
+public interface HaConfigurations {
+
+    HaProvider getHaProvider();
+
+    void setLoadBalancingEnabled(boolean enabled);
+
+    void setHaProvider(HaProvider haProvider);
+
+    boolean isStickySessionEnabled();
+
+    void setStickySessionsEnabled(boolean enabled);
+
+    String getStickySessionCookieName();
+
+    void setStickySessionCookieName(String stickySessionCookieName);
+
+    boolean isLoadBalancingEnabled();
+
+    List<String> getDisableLoadBalancingForUserAgents();
+
+    void setDisableLoadBalancingForUserAgents(List<String> 
disableLoadBalancingForUserAgents);
+
+    int getMaxFailoverAttempts();
+
+    void setMaxFailoverAttempts(int maxFailoverAttempts);
+
+    int getFailoverSleep();
+
+    void setFailoverSleep(int failoverSleep);
+
+    void setFailoverNonIdempotentRequestEnabled(boolean enabled);
+
+    boolean isFailoverNonIdempotentRequestEnabled();
+
+    void setNoFallbackEnabled(boolean enabled);
+
+    boolean isNoFallbackEnabled();
+}
diff --git 
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/LBHaDispatch.java
 
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/CommonHaDispatch.java
similarity index 54%
rename from 
gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/LBHaDispatch.java
rename to 
gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/CommonHaDispatch.java
index 433e9bfe5..2918c099e 100644
--- 
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/LBHaDispatch.java
+++ 
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/CommonHaDispatch.java
@@ -22,14 +22,16 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.http.client.methods.HttpRequestBase;
 import org.apache.http.client.methods.HttpUriRequest;
 import org.apache.http.client.utils.URIBuilder;
+import org.apache.knox.gateway.filter.AbstractGatewayFilter;
 import org.apache.knox.gateway.ha.dispatch.i18n.HaDispatchMessages;
-import org.apache.knox.gateway.ha.provider.HaProvider;
+import org.apache.knox.gateway.ha.config.HaConfigurations;
 import org.apache.knox.gateway.ha.provider.HaServiceConfig;
 import org.apache.knox.gateway.i18n.messages.MessagesFactory;
 
 import javax.servlet.http.Cookie;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Arrays;
@@ -38,70 +40,65 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
-public interface LBHaDispatch {
+public interface CommonHaDispatch {
 
     HaDispatchMessages LOG = MessagesFactory.get(HaDispatchMessages.class);
     Map<String, String> urlToHashLookup = new HashMap<>();
     Map<String, String> hashToUrlLookup = new HashMap<>();
-
-    boolean isStickySessionEnabled();
-
-    void setStickySessionsEnabled(boolean enabled);
-
-    String getStickySessionCookieName();
-
-    void setStickySessionCookieName(String stickySessionCookieName);
-
-    HaProvider getHaProvider();
+    String FAILOVER_COUNTER_ATTRIBUTE = "dispatch.ha.failover.counter";
+    List<String> nonIdempotentRequests = Arrays.asList("POST", "PATCH", 
"CONNECT");
 
     String getServiceRole();
 
-    void setLoadBalancingEnabled(boolean enabled);
-
-    boolean isLoadBalancingEnabled();
+    URI getDispatchUrl(HttpServletRequest request);
 
-    List<String> getDisableLoadBalancingForUserAgents();
-
-    void setDisableLoadBalancingForUserAgents(List<String> 
disableLoadBalancingForUserAgents);
+    HaConfigurations getHaConfigurations();
 
     AtomicReference<String> getActiveURL();
 
     void setActiveURL(String url);
 
-    default void initializeLBHaDispatch(HaServiceConfig serviceConfig) {
-        setLoadBalancingEnabled(serviceConfig.isLoadBalancingEnabled());
-        setStickySessionsEnabled(isLoadBalancingEnabled() && 
serviceConfig.isStickySessionEnabled());
+    default void initializeCommonHaDispatch(HaServiceConfig serviceConfig) {
+        
getHaConfigurations().setLoadBalancingEnabled(serviceConfig.isLoadBalancingEnabled());
+        
getHaConfigurations().setStickySessionsEnabled(getHaConfigurations().isLoadBalancingEnabled()
 && serviceConfig.isStickySessionEnabled());
 
-        if (isStickySessionEnabled()) {
-            
setStickySessionCookieName(serviceConfig.getStickySessionCookieName());
+        if (getHaConfigurations().isStickySessionEnabled()) {
+            
getHaConfigurations().setStickySessionCookieName(serviceConfig.getStickySessionCookieName());
         }
 
         if 
(StringUtils.isNotBlank(serviceConfig.getStickySessionDisabledUserAgents())) {
-            
setDisableLoadBalancingForUserAgents(Arrays.asList(serviceConfig.getStickySessionDisabledUserAgents()
+            
getHaConfigurations().setDisableLoadBalancingForUserAgents(Arrays.asList(serviceConfig.getStickySessionDisabledUserAgents()
                     .trim()
                     .split("\\s*,\\s*")));
         }
         setupUrlHashLookup();
 
         /* setup the active URL for non-LB case */
-        setActiveURL(getHaProvider().getActiveURL(getServiceRole()));
+        
setActiveURL(getHaConfigurations().getHaProvider().getActiveURL(getServiceRole()));
 
         // Suffix the cookie name by the service to make it unique
         // The cookie path is NOT unique since Knox is stripping the service 
name.
-        setStickySessionCookieName(getStickySessionCookieName() + '-' + 
getServiceRole());
+        
getHaConfigurations().setStickySessionCookieName(getHaConfigurations().getStickySessionCookieName()
 + '-' + getServiceRole());
+
+        // Set the failover parameters
+        
getHaConfigurations().setMaxFailoverAttempts(serviceConfig.getMaxFailoverAttempts());
+        
getHaConfigurations().setFailoverSleep(serviceConfig.getFailoverSleep());
+        
getHaConfigurations().setFailoverNonIdempotentRequestEnabled(serviceConfig.isFailoverNonIdempotentRequestEnabled());
+        
getHaConfigurations().setNoFallbackEnabled(getHaConfigurations().isStickySessionEnabled()
 && serviceConfig.isNoFallbackEnabled());
     }
 
     default void setKnoxHaCookie(final HttpUriRequest outboundRequest, final 
HttpServletRequest inboundRequest,
                                  final HttpServletResponse outboundResponse, 
boolean sslEnabled) {
-        if (isStickySessionEnabled()) {
+        if (getHaConfigurations().isStickySessionEnabled()) {
             List<Cookie> serviceHaCookies = Collections.emptyList();
             if (inboundRequest.getCookies() != null) {
                 serviceHaCookies = Arrays
                         .stream(inboundRequest.getCookies())
-                        .filter(cookie -> 
getStickySessionCookieName().equals(cookie.getName()))
+                        .filter(cookie -> 
getHaConfigurations().getStickySessionCookieName().equals(cookie.getName()))
                         .collect(Collectors.toList());
             }
 
@@ -114,12 +111,12 @@ public interface LBHaDispatch {
                  * we set cookie for the endpoint that was served and not rely 
on haProvider.getActiveURL().
                  * let LBing logic take care of rotating urls.
                  **/
-                final List<String> urls = 
getHaProvider().getURLs(getServiceRole())
+                final List<String> urls = 
getHaConfigurations().getHaProvider().getURLs(getServiceRole())
                         .stream()
                         .filter(u -> 
u.contains(outboundRequest.getURI().getHost()))
                         .collect(Collectors.toList());
                 final String cookieValue = urlToHashLookup.get(urls.get(0));
-                Cookie stickySessionCookie = new 
Cookie(getStickySessionCookieName(), cookieValue);
+                Cookie stickySessionCookie = new 
Cookie(getHaConfigurations().getStickySessionCookieName(), cookieValue);
                 stickySessionCookie.setPath(inboundRequest.getContextPath());
                 stickySessionCookie.setMaxAge(-1);
                 stickySessionCookie.setHttpOnly(true);
@@ -129,14 +126,14 @@ public interface LBHaDispatch {
         }
     }
 
-    default Optional<URI> setBackendFromHaCookie(HttpUriRequest 
outboundRequest, HttpServletRequest inboundRequest) {
-        if (isLoadBalancingEnabled() && isStickySessionEnabled() && 
inboundRequest.getCookies() != null) {
+    default Optional<URI> getBackendFromHaCookie(HttpUriRequest 
outboundRequest, HttpServletRequest inboundRequest) {
+        if (getHaConfigurations().isLoadBalancingEnabled() && 
getHaConfigurations().isStickySessionEnabled() && inboundRequest.getCookies() 
!= null) {
             for (Cookie cookie : inboundRequest.getCookies()) {
-                if (getStickySessionCookieName().equals(cookie.getName())) {
+                if 
(getHaConfigurations().getStickySessionCookieName().equals(cookie.getName())) {
                     String backendURLHash = cookie.getValue();
                     String backendURL = hashToUrlLookup.get(backendURLHash);
                     // Make sure that the url provided is actually a valid 
backend url
-                    if 
(getHaProvider().getURLs(getServiceRole()).contains(backendURL)) {
+                    if 
(getHaConfigurations().getHaProvider().getURLs(getServiceRole()).contains(backendURL))
 {
                         try {
                             return 
Optional.of(updateHostURL(outboundRequest.getURI(), backendURL));
                         } catch (URISyntaxException ignore) {
@@ -149,6 +146,7 @@ public interface LBHaDispatch {
         return Optional.empty();
     }
 
+
     default String hash(String url) {
         return DigestUtils.sha256Hex(url);
     }
@@ -172,7 +170,7 @@ public interface LBHaDispatch {
     }
 
     default void setupUrlHashLookup() {
-        for (String url : getHaProvider().getURLs(getServiceRole())) {
+        for (String url : 
getHaConfigurations().getHaProvider().getURLs(getServiceRole())) {
             String urlHash = hash(url);
             urlToHashLookup.put(url, urlHash);
             hashToUrlLookup.put(urlHash, url);
@@ -185,9 +183,9 @@ public interface LBHaDispatch {
         boolean userAgentDisabled = false;
 
         /* disable loadbalancing in case a configured user agent is detected 
to disable LB */
-        if 
(getDisableLoadBalancingForUserAgents().stream().anyMatch(userAgentFromBrowser::contains))
 {
+        if 
(getHaConfigurations().getDisableLoadBalancingForUserAgents().stream().anyMatch(userAgentFromBrowser::contains))
 {
             userAgentDisabled = true;
-            LOG.disableHALoadbalancinguserAgent(userAgentFromBrowser, 
getDisableLoadBalancingForUserAgents().toString());
+            LOG.disableHALoadbalancinguserAgent(userAgentFromBrowser, 
getHaConfigurations().getDisableLoadBalancingForUserAgents().toString());
         }
 
         return userAgentDisabled;
@@ -197,7 +195,7 @@ public interface LBHaDispatch {
         /* if disable LB is set don't bother setting backend from cookie */
         Optional<URI> backendURI = Optional.empty();
         if (!userAgentDisabled) {
-            backendURI = setBackendFromHaCookie(outboundRequest, 
inboundRequest);
+            backendURI = getBackendFromHaCookie(outboundRequest, 
inboundRequest);
             backendURI.ifPresent(uri -> ((HttpRequestBase) 
outboundRequest).setURI(uri));
         }
 
@@ -206,7 +204,7 @@ public interface LBHaDispatch {
          * and we have a HTTP request configured not to use LB
          * use the activeURL
          */
-        if (isLoadBalancingEnabled() && userAgentDisabled) {
+        if (getHaConfigurations().isLoadBalancingEnabled() && 
userAgentDisabled) {
             try {
                 ((HttpRequestBase) 
outboundRequest).setURI(updateHostURL(outboundRequest.getURI(), 
getActiveURL().get()));
             } catch (final URISyntaxException e) {
@@ -225,19 +223,88 @@ public interface LBHaDispatch {
          *    needs to be loadbalanced. If a request has BACKEND coookie and 
Loadbalance=on then
          *    there should be no loadbalancing.
          */
-        if (isLoadBalancingEnabled() && !userAgentDisabled) {
+        if (getHaConfigurations().isLoadBalancingEnabled() && 
!userAgentDisabled) {
             /* check sticky session enabled */
-            if (isStickySessionEnabled()) {
+            if (getHaConfigurations().isStickySessionEnabled()) {
                 /* loadbalance only when sticky session enabled and no backend 
url cookie */
                 if (!backendURI.isPresent()) {
-                    
getHaProvider().makeNextActiveURLAvailable(getServiceRole());
+                    
getHaConfigurations().getHaProvider().makeNextActiveURLAvailable(getServiceRole());
                 } else {
                     /* sticky session enabled and backend url cookie is valid 
no need to loadbalance */
                     /* do nothing */
                 }
             } else {
-                getHaProvider().makeNextActiveURLAvailable(getServiceRole());
+                
getHaConfigurations().getHaProvider().makeNextActiveURLAvailable(getServiceRole());
+            }
+        }
+    }
+
+    /**
+     * A helper method that marks an endpoint failed.
+     * Changes HA Provider state.
+     * Changes ActiveUrl state.
+     * Changes for inbound urls should be handled by calling functions.
+     *
+     * @param outboundRequest
+     * @param inboundRequest
+     * @return current failover counter
+     */
+    default AtomicInteger markEndpointFailed(final HttpUriRequest 
outboundRequest, final HttpServletRequest inboundRequest) {
+        synchronized (this) {
+            
getHaConfigurations().getHaProvider().markFailedURL(getServiceRole(), 
outboundRequest.getURI().toString());
+            AtomicInteger counter = (AtomicInteger) 
inboundRequest.getAttribute(FAILOVER_COUNTER_ATTRIBUTE);
+            if (counter == null) {
+                counter = new AtomicInteger(0);
+            }
+
+            if (counter.incrementAndGet() <= 
getHaConfigurations().getMaxFailoverAttempts()) {
+                setupUrlHashLookup(); // refresh the url hash after failing a 
url
+                /* in case of failover update the activeURL variable */
+                getActiveURL().set(outboundRequest.getURI().toString());
+            }
+            return counter;
+        }
+    }
+
+    default HttpServletRequest prepareForFailover(HttpUriRequest 
outboundRequest, HttpServletRequest inboundRequest) {
+        //null out target url so that rewriters run again
+        
inboundRequest.setAttribute(AbstractGatewayFilter.TARGET_REQUEST_URL_ATTRIBUTE_NAME,
 null);
+        // Make sure to remove the ha cookie from the request
+        inboundRequest = new 
StickySessionCookieRemovedRequest(getHaConfigurations().getStickySessionCookieName(),
 inboundRequest);
+        URI uri = getDispatchUrl(inboundRequest);
+        ((HttpRequestBase) outboundRequest).setURI(uri);
+        if (getHaConfigurations().getFailoverSleep() > 0) {
+            try {
+                Thread.sleep(getHaConfigurations().getFailoverSleep());
+            } catch (InterruptedException e) {
+                LOG.failoverSleepFailed(getServiceRole(), e);
+                Thread.currentThread().interrupt();
             }
         }
+        LOG.failingOverRequest(outboundRequest.getURI().toString());
+        return inboundRequest;
+    }
+
+    default boolean disabledFailoverHandled(HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse) throws IOException {
+        // Check whether the session cookie is present
+        Optional<Cookie> sessionCookie = Optional.empty();
+        if (inboundRequest.getCookies() != null) {
+            sessionCookie =
+                    Arrays.stream(inboundRequest.getCookies())
+                            .filter(cookie -> 
getHaConfigurations().getStickySessionCookieName().equals(cookie.getName()))
+                            .findFirst();
+        }
+
+        // Check for a case where no fallback is configured
+        if (getHaConfigurations().isStickySessionEnabled() && 
getHaConfigurations().isNoFallbackEnabled() && sessionCookie.isPresent()) {
+            LOG.noFallbackError();
+            outboundResponse.sendError(HttpServletResponse.SC_BAD_GATEWAY, 
"Service connection error, HA failover disabled");
+            return true;
+        }
+        return false;
+    }
+
+    default boolean 
isNonIdempotentAndNonIdempotentFailoverDisabled(HttpUriRequest outboundRequest) 
{
+        return !getHaConfigurations().isFailoverNonIdempotentRequestEnabled() 
&& 
nonIdempotentRequests.stream().anyMatch(outboundRequest.getMethod()::equalsIgnoreCase);
     }
 }
diff --git 
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/ConfigurableHADispatch.java
 
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/ConfigurableHADispatch.java
index 0cb6b35b0..40382dd43 100644
--- 
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/ConfigurableHADispatch.java
+++ 
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/ConfigurableHADispatch.java
@@ -18,27 +18,20 @@
 package org.apache.knox.gateway.ha.dispatch;
 
 import org.apache.http.HttpResponse;
-import org.apache.http.client.methods.HttpRequestBase;
 import org.apache.http.client.methods.HttpUriRequest;
 import org.apache.knox.gateway.config.Configure;
 import org.apache.knox.gateway.config.GatewayConfig;
 import org.apache.knox.gateway.dispatch.ConfigurableDispatch;
-import org.apache.knox.gateway.filter.AbstractGatewayFilter;
 import org.apache.knox.gateway.ha.dispatch.i18n.HaDispatchMessages;
+import org.apache.knox.gateway.ha.config.CommonHaConfigurations;
+import org.apache.knox.gateway.ha.config.HaConfigurations;
 import org.apache.knox.gateway.ha.provider.HaProvider;
-import org.apache.knox.gateway.ha.provider.HaServiceConfig;
-import org.apache.knox.gateway.ha.provider.impl.HaServiceConfigConstants;
 import org.apache.knox.gateway.i18n.messages.MessagesFactory;
 
-import javax.servlet.http.Cookie;
 import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletRequestWrapper;
 import javax.servlet.http.HttpServletResponse;
 import java.io.IOException;
 import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -49,106 +42,46 @@ import static 
org.apache.knox.gateway.util.HttpUtils.isConnectionError;
  * A configurable HA dispatch class that has a very basic failover mechanism 
and
  * configurable options of ConfigurableDispatch class.
  */
-public class ConfigurableHADispatch extends ConfigurableDispatch implements 
LBHaDispatch {
-
-  protected static final String FAILOVER_COUNTER_ATTRIBUTE = 
"dispatch.ha.failover.counter";
+public class ConfigurableHADispatch extends ConfigurableDispatch implements 
CommonHaDispatch {
 
   protected static final HaDispatchMessages LOG = 
MessagesFactory.get(HaDispatchMessages.class);
-
-  protected int maxFailoverAttempts = 
HaServiceConfigConstants.DEFAULT_MAX_FAILOVER_ATTEMPTS;
-
-  protected int failoverSleep = 
HaServiceConfigConstants.DEFAULT_FAILOVER_SLEEP;
-
-  protected HaProvider haProvider;
-
-  protected static final List<String> nonIdempotentRequests = 
Arrays.asList("POST", "PATCH", "CONNECT");
-
-  private boolean loadBalancingEnabled = 
HaServiceConfigConstants.DEFAULT_LOAD_BALANCING_ENABLED;
-  private boolean stickySessionsEnabled = 
HaServiceConfigConstants.DEFAULT_STICKY_SESSIONS_ENABLED;
-  private boolean noFallbackEnabled = 
HaServiceConfigConstants.DEFAULT_NO_FALLBACK_ENABLED;
-  protected boolean failoverNonIdempotentRequestEnabled = 
HaServiceConfigConstants.DEFAULT_FAILOVER_NON_IDEMPOTENT;
-  private String stickySessionCookieName = 
HaServiceConfigConstants.DEFAULT_STICKY_SESSION_COOKIE_NAME;
-  private List<String> disableLoadBalancingForUserAgents = 
Arrays.asList(HaServiceConfigConstants.DEFAULT_DISABLE_LB_USER_AGENTS);
+  private final HaConfigurations haConfigurations = new 
CommonHaConfigurations();
 
   /**
-   *  This activeURL is used to track urls when LB is turned off for some 
clients.
-   *  The problem we have with selectively turning off LB is that other clients
-   *  that use LB can change the state from under the current session where LB 
is
-   *  turned off.
-   *  e.g.
-   *  ODBC Connection established where LB is off. JDBC connection is 
established
-   *  next where LB is enabled. This changes the active URL under the existing 
ODBC
-   *  connection which will be an issue.
-   *  This variable keeps track of non-LB'ed url and updated upon failover.
+   * This activeURL is used to track urls when LB is turned off for some 
clients.
+   * The problem we have with selectively turning off LB is that other clients
+   * that use LB can change the state from under the current session where LB 
is
+   * turned off.
+   * e.g.
+   * ODBC Connection established where LB is off. JDBC connection is 
established
+   * next where LB is enabled. This changes the active URL under the existing 
ODBC
+   * connection which will be an issue.
+   * This variable keeps track of non-LB'ed url and updated upon failover.
    */
-  private AtomicReference<String> activeURL =  new AtomicReference();
+  private final AtomicReference<String> activeURL = new AtomicReference<>();
+
   @Override
   public void init() {
     super.init();
     LOG.initializingForResourceRole(getServiceRole());
-    if ( haProvider != null ) {
-      HaServiceConfig serviceConfig = 
haProvider.getHaDescriptor().getServiceConfig(getServiceRole());
-      maxFailoverAttempts = serviceConfig.getMaxFailoverAttempts();
-      failoverSleep = serviceConfig.getFailoverSleep();
-      failoverNonIdempotentRequestEnabled = 
serviceConfig.isFailoverNonIdempotentRequestEnabled();
-      initializeLBHaDispatch(serviceConfig);
-      noFallbackEnabled = stickySessionsEnabled && 
serviceConfig.isNoFallbackEnabled();
+    if (haConfigurations.getHaProvider() != null) {
+      
initializeCommonHaDispatch(haConfigurations.getHaProvider().getHaDescriptor().getServiceConfig(getServiceRole()));
     }
   }
 
   @Override
-  public HaProvider getHaProvider() {
-    return haProvider;
+  public HaConfigurations getHaConfigurations() {
+    return haConfigurations;
   }
 
   @Configure
   public void setHaProvider(HaProvider haProvider) {
-    this.haProvider = haProvider;
-  }
-
-  @Override
-  public boolean isStickySessionEnabled() {
-      return stickySessionsEnabled;
-  }
-
-  @Override
-  public void setStickySessionsEnabled(boolean enabled) {
-    this.stickySessionsEnabled = enabled;
-  }
-
-  @Override
-  public String getStickySessionCookieName() {
-      return stickySessionCookieName;
-  }
-
-  @Override
-  public void setStickySessionCookieName(String stickySessionCookieName) {
-    this.stickySessionCookieName = stickySessionCookieName;
-  }
-
-  @Override
-  public boolean isLoadBalancingEnabled() {
-      return loadBalancingEnabled;
-  }
-
-  @Override
-  public void setLoadBalancingEnabled(boolean enabled) {
-    this.loadBalancingEnabled = enabled;
-  }
-
-  @Override
-  public List<String> getDisableLoadBalancingForUserAgents() {
-      return disableLoadBalancingForUserAgents;
-  }
-
-  @Override
-  public void setDisableLoadBalancingForUserAgents(List<String> 
disableLoadBalancingForUserAgents) {
-    this.disableLoadBalancingForUserAgents = disableLoadBalancingForUserAgents;
+    getHaConfigurations().setHaProvider(haProvider);
   }
 
   @Override
   public AtomicReference<String> getActiveURL() {
-      return activeURL;
+    return activeURL;
   }
 
   @Override
@@ -195,48 +128,19 @@ public class ConfigurableHADispatch extends 
ConfigurableDispatch implements LBHa
     }
   }
 
-  private boolean 
isNonIdempotentAndNonIdempotentFailoverDisabled(HttpUriRequest outboundRequest) 
{
-    return !failoverNonIdempotentRequestEnabled && 
nonIdempotentRequests.stream().anyMatch(outboundRequest.getMethod()::equalsIgnoreCase);
-  }
-
   protected void failoverRequest(HttpUriRequest outboundRequest, 
HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, 
HttpResponse inboundResponse, Exception exception) throws IOException {
-    // Check whether the session cookie is present
-    Optional<Cookie> sessionCookie = Optional.empty();
-    if (inboundRequest.getCookies() != null) {
-        sessionCookie =
-                Arrays.stream(inboundRequest.getCookies())
-                      .filter(cookie -> 
stickySessionCookieName.equals(cookie.getName()))
-                      .findFirst();
-    }
-
-    // Check for a case where no fallback is configured
-    if(stickySessionsEnabled && noFallbackEnabled && 
sessionCookie.isPresent()) {
-      LOG.noFallbackError();
-      outboundResponse.sendError(HttpServletResponse.SC_BAD_GATEWAY, "Service 
connection error, HA failover disabled");
+    if (disabledFailoverHandled(inboundRequest, outboundResponse)) {
       return;
     }
+
     /* mark endpoint as failed */
     final AtomicInteger counter = markEndpointFailed(outboundRequest, 
inboundRequest);
     inboundRequest.setAttribute(FAILOVER_COUNTER_ATTRIBUTE, counter);
-    if ( counter.get() <= maxFailoverAttempts ) {
-      //null out target url so that rewriters run again
-      
inboundRequest.setAttribute(AbstractGatewayFilter.TARGET_REQUEST_URL_ATTRIBUTE_NAME,
 null);
-      // Make sure to remove the cookie ha cookie from the request
-      inboundRequest = new 
StickySessionCookieRemovedRequest(stickySessionCookieName, inboundRequest);
-      URI uri = getDispatchUrl(inboundRequest);
-      ((HttpRequestBase) outboundRequest).setURI(uri);
-      if ( failoverSleep > 0 ) {
-        try {
-          Thread.sleep(failoverSleep);
-        } catch ( InterruptedException e ) {
-          LOG.failoverSleepFailed(getServiceRole(), e);
-          Thread.currentThread().interrupt();
-        }
-      }
-      LOG.failingOverRequest(outboundRequest.getURI().toString());
+    if ( counter.get() <= haConfigurations.getMaxFailoverAttempts() ) {
+      inboundRequest = prepareForFailover(outboundRequest, inboundRequest);
       executeRequest(outboundRequest, inboundRequest, outboundResponse);
     } else {
-      LOG.maxFailoverAttemptsReached(maxFailoverAttempts, getServiceRole());
+      
LOG.maxFailoverAttemptsReached(haConfigurations.getMaxFailoverAttempts(), 
getServiceRole());
       if ( inboundResponse != null ) {
         writeOutboundResponse(outboundRequest, inboundRequest, 
outboundResponse, inboundResponse);
       } else {
@@ -244,58 +148,4 @@ public class ConfigurableHADispatch extends 
ConfigurableDispatch implements LBHa
       }
     }
   }
-
-  /**
-   * A helper method that marks an endpoint failed.
-   * Changes HA Provider state.
-   * Changes ActiveUrl state.
-   * Changes for inbound urls should be handled by calling functions.
-   * @param outboundRequest
-   * @param inboundRequest
-   * @return current failover counter
-   */
-  private synchronized AtomicInteger markEndpointFailed(final HttpUriRequest 
outboundRequest, final HttpServletRequest inboundRequest) {
-    haProvider.markFailedURL(getServiceRole(), 
outboundRequest.getURI().toString());
-    AtomicInteger counter = (AtomicInteger) 
inboundRequest.getAttribute(FAILOVER_COUNTER_ATTRIBUTE);
-    if ( counter == null ) {
-      counter = new AtomicInteger(0);
-    }
-
-    if ( counter.incrementAndGet() <= maxFailoverAttempts ) {
-      setupUrlHashLookup(); // refresh the url hash after failing a url
-      /* in case of failover update the activeURL variable */
-      activeURL.set(outboundRequest.getURI().toString());
-    }
-    return counter;
-  }
-
-  /**
-   * Strips out the cookies by the cookie name provided
-   */
-  private static class StickySessionCookieRemovedRequest extends 
HttpServletRequestWrapper {
-    private final Cookie[] cookies;
-
-    StickySessionCookieRemovedRequest(String cookieName, HttpServletRequest 
request) {
-      super(request);
-      this.cookies = filterCookies(cookieName, request.getCookies());
-    }
-
-    private Cookie[] filterCookies(String cookieName, Cookie[] cookies) {
-      if (super.getCookies() == null) {
-        return null;
-      }
-      List<Cookie> cookiesInternal = new ArrayList<>();
-      for (Cookie cookie : cookies) {
-        if (!cookieName.equals(cookie.getName())) {
-          cookiesInternal.add(cookie);
-        }
-      }
-      return cookiesInternal.toArray(new Cookie[0]);
-    }
-
-    @Override
-    public Cookie[] getCookies() {
-      return cookies;
-    }
-  }
 }
diff --git 
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/SSEHaCallback.java
 
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/SSEHaCallback.java
new file mode 100644
index 000000000..613e19941
--- /dev/null
+++ 
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/SSEHaCallback.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.ha.dispatch;
+
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.knox.gateway.sse.SSECallback;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import static org.apache.knox.gateway.util.HttpUtils.isConnectionError;
+
+public class SSEHaCallback extends SSECallback {
+
+    private final SSEHaDispatch dispatch;
+    private final HttpUriRequest outboundRequest;
+    private final HttpServletRequest inboundRequest;
+
+    public SSEHaCallback(HttpServletResponse outboundResponse, AsyncContext 
asyncContext, HttpAsyncRequestProducer producer,
+                         SSEHaDispatch dispatch, HttpUriRequest 
outboundRequest, HttpServletRequest inboundRequest) {
+        super(outboundResponse, asyncContext, producer);
+        this.dispatch = dispatch;
+        this.outboundRequest = outboundRequest;
+        this.inboundRequest = inboundRequest;
+    }
+
+    @Override
+    public void failed(final Exception ex) {
+        if(outboundResponse.isCommitted()) {
+            /*
+                If the response was already committed, it means the connection 
was closed abruptly no failover needed.
+                The endpoint shouldn't be marked as failed.
+            */
+            super.failed(ex);
+        } else {
+            if(!isConnectionError(ex.getCause() == null ? ex : ex.getCause()) 
&& dispatch.isNonIdempotentAndNonIdempotentFailoverDisabled(outboundRequest)) {
+                dispatch.markEndpointFailed(outboundRequest, inboundRequest);
+                super.failed(ex);
+            } else {
+                releaseResources(false);
+                LOG.sseConnectionError(ex.getMessage());
+                dispatch.failoverRequest(outboundRequest, outboundResponse, 
inboundRequest, asyncContext);
+            }
+        }
+    }
+}
diff --git 
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatch.java
 
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatch.java
index 50582c2b9..136a1721f 100644
--- 
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatch.java
+++ 
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatch.java
@@ -18,33 +18,36 @@
 package org.apache.knox.gateway.ha.dispatch;
 
 import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.nio.client.methods.AsyncCharConsumer;
+import org.apache.http.nio.client.methods.HttpAsyncMethods;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.knox.gateway.audit.api.Action;
+import org.apache.knox.gateway.audit.api.ActionOutcome;
+import org.apache.knox.gateway.audit.api.ResourceType;
 import org.apache.knox.gateway.config.Configure;
 import org.apache.knox.gateway.config.GatewayConfig;
 import org.apache.knox.gateway.ha.dispatch.i18n.HaDispatchMessages;
+import org.apache.knox.gateway.ha.config.CommonHaConfigurations;
+import org.apache.knox.gateway.ha.config.HaConfigurations;
 import org.apache.knox.gateway.ha.provider.HaProvider;
-import org.apache.knox.gateway.ha.provider.HaServiceConfig;
-import org.apache.knox.gateway.ha.provider.impl.HaServiceConfigConstants;
 import org.apache.knox.gateway.i18n.messages.MessagesFactory;
 import org.apache.knox.gateway.sse.SSEDispatch;
+import org.apache.knox.gateway.sse.SSEResponse;
 
+import javax.servlet.AsyncContext;
 import javax.servlet.FilterConfig;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-import java.net.URI;
-import java.util.Collections;
-import java.util.List;
+import java.io.IOException;
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
-public class SSEHaDispatch extends SSEDispatch implements LBHaDispatch {
+public class SSEHaDispatch extends SSEDispatch implements CommonHaDispatch {
 
     protected static final HaDispatchMessages LOG = 
MessagesFactory.get(HaDispatchMessages.class);
-    protected HaProvider haProvider;
-    private boolean loadBalancingEnabled = 
HaServiceConfigConstants.DEFAULT_LOAD_BALANCING_ENABLED;
-    private boolean stickySessionsEnabled = 
HaServiceConfigConstants.DEFAULT_STICKY_SESSIONS_ENABLED;
-    private String stickySessionCookieName = 
HaServiceConfigConstants.DEFAULT_STICKY_SESSION_COOKIE_NAME;
-    private List<String> disableLoadBalancingForUserAgents = 
Collections.singletonList(HaServiceConfigConstants.DEFAULT_DISABLE_LB_USER_AGENTS);
     private final boolean sslEnabled;
+    private final HaConfigurations haConfigurations = new 
CommonHaConfigurations();
 
     /**
      * This activeURL is used to track urls when LB is turned off for some 
clients.
@@ -70,60 +73,19 @@ public class SSEHaDispatch extends SSEDispatch implements 
LBHaDispatch {
     public void init() {
         super.init();
         LOG.initializingForResourceRole(getServiceRole());
-        if (haProvider != null) {
-            HaServiceConfig serviceConfig = 
haProvider.getHaDescriptor().getServiceConfig(getServiceRole());
-            this.initializeLBHaDispatch(serviceConfig);
+        if (haConfigurations.getHaProvider() != null) {
+            
initializeCommonHaDispatch(haConfigurations.getHaProvider().getHaDescriptor().getServiceConfig(getServiceRole()));
         }
     }
 
-    @Override
-    public HaProvider getHaProvider() {
-        return haProvider;
-    }
-
-    @Override
-    public void setLoadBalancingEnabled(boolean enabled) {
-        this.loadBalancingEnabled = enabled;
-    }
-
     @Configure
     public void setHaProvider(HaProvider haProvider) {
-        this.haProvider = haProvider;
-    }
-
-    @Override
-    public boolean isStickySessionEnabled() {
-        return stickySessionsEnabled;
-    }
-
-    @Override
-    public void setStickySessionsEnabled(boolean enabled) {
-        this.stickySessionsEnabled = enabled;
+        getHaConfigurations().setHaProvider(haProvider);
     }
 
     @Override
-    public String getStickySessionCookieName() {
-        return stickySessionCookieName;
-    }
-
-    @Override
-    public void setStickySessionCookieName(String stickySessionCookieName) {
-        this.stickySessionCookieName = stickySessionCookieName;
-    }
-
-    @Override
-    public boolean isLoadBalancingEnabled() {
-        return loadBalancingEnabled;
-    }
-
-    @Override
-    public List<String> getDisableLoadBalancingForUserAgents() {
-        return disableLoadBalancingForUserAgents;
-    }
-
-    @Override
-    public void setDisableLoadBalancingForUserAgents(List<String> 
disableLoadBalancingForUserAgents) {
-        this.disableLoadBalancingForUserAgents = 
disableLoadBalancingForUserAgents;
+    public HaConfigurations getHaConfigurations() {
+        return haConfigurations;
     }
 
     @Override
@@ -136,17 +98,60 @@ public class SSEHaDispatch extends SSEDispatch implements 
LBHaDispatch {
         activeURL.set(url);
     }
 
+    @Override
+    protected void executeAsyncRequest(HttpUriRequest outboundRequest, 
HttpServletResponse outboundResponse,
+                                     AsyncContext asyncContext, 
HttpServletRequest inboundRequest) {
+        HttpAsyncRequestProducer producer = 
HttpAsyncMethods.create(outboundRequest);
+        AsyncCharConsumer<SSEResponse> consumer = new 
SSECharConsumer(outboundResponse, outboundRequest.getURI(), asyncContext, 
inboundRequest, outboundRequest);
+        LOG.dispatchRequest(outboundRequest.getMethod(), 
outboundRequest.getURI());
+        auditor.audit(Action.DISPATCH, outboundRequest.getURI().toString(), 
ResourceType.URI, ActionOutcome.UNAVAILABLE, 
RES.requestMethod(outboundRequest.getMethod()));
+        asyncClient.execute(producer, consumer, new 
SSEHaCallback(outboundResponse, asyncContext, producer, this, outboundRequest, 
inboundRequest));
+    }
+
+    protected void failoverRequest(HttpUriRequest outboundRequest, 
HttpServletResponse outboundResponse,
+                                   HttpServletRequest inboundRequest, 
AsyncContext asyncContext) {
+
+        try {
+            if (disabledFailoverHandled(inboundRequest, outboundResponse)) {
+                asyncContext.complete();
+                return;
+            }
+
+            /* mark endpoint as failed */
+            final AtomicInteger counter = markEndpointFailed(outboundRequest, 
inboundRequest);
+            inboundRequest.setAttribute(FAILOVER_COUNTER_ATTRIBUTE, counter);
+            if (counter.get() <= haConfigurations.getMaxFailoverAttempts()) {
+                inboundRequest = prepareForFailover(outboundRequest, 
inboundRequest);
+                executeAsyncRequest(outboundRequest, outboundResponse, 
asyncContext, inboundRequest);
+            } else {
+                
LOG.maxFailoverAttemptsReached(haConfigurations.getMaxFailoverAttempts(), 
getServiceRole());
+                outboundResponse.sendError(HttpServletResponse.SC_BAD_GATEWAY, 
"Service connection error, max failover attempts reached");
+                asyncContext.complete();
+            }
+        } catch (IOException e) {
+            asyncContext.complete();
+        }
+    }
 
     @Override
     protected void executeRequestWrapper(HttpUriRequest outboundRequest, 
HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) {
         boolean userAgentDisabled = isUserAgentDisabled(inboundRequest);
-        Optional<URI> backendURI = setBackendUri(outboundRequest, 
inboundRequest, userAgentDisabled);
+        setBackendUri(outboundRequest, inboundRequest, userAgentDisabled);
         executeRequest(outboundRequest, inboundRequest, outboundResponse);
-        shiftActiveURL(userAgentDisabled, backendURI);
     }
 
     @Override
     protected void outboundResponseWrapper(final HttpUriRequest 
outboundRequest, final HttpServletRequest inboundRequest, final 
HttpServletResponse outboundResponse) {
         setKnoxHaCookie(outboundRequest, inboundRequest, outboundResponse, 
sslEnabled);
     }
+
+    @Override
+    protected void shiftCallback(HttpUriRequest outboundRequest, 
HttpServletRequest inboundRequest) {
+        /*
+            Due to the async behavior shifting has to take place after a 
successful response-received event
+            and not in the executeRequest method. This is the same as in a 
sync dispatch.
+        */
+        boolean userAgentDisabled = isUserAgentDisabled(inboundRequest);
+        shiftActiveURL(userAgentDisabled, userAgentDisabled ? Optional.empty() 
: getBackendFromHaCookie(outboundRequest, inboundRequest));
+    }
 }
diff --git 
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/StickySessionCookieRemovedRequest.java
 
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/StickySessionCookieRemovedRequest.java
new file mode 100644
index 000000000..1a0991519
--- /dev/null
+++ 
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/StickySessionCookieRemovedRequest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.ha.dispatch;
+
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletRequestWrapper;
+import java.util.ArrayList;
+import java.util.List;
+
+public class StickySessionCookieRemovedRequest extends 
HttpServletRequestWrapper {
+    private final Cookie[] cookies;
+
+    StickySessionCookieRemovedRequest(String cookieName, HttpServletRequest 
request) {
+        super(request);
+        this.cookies = filterCookies(cookieName, request.getCookies());
+    }
+
+    private Cookie[] filterCookies(String cookieName, Cookie[] cookies) {
+        if (super.getCookies() == null) {
+            return null;
+        }
+        List<Cookie> cookiesInternal = new ArrayList<>();
+        for (Cookie cookie : cookies) {
+            if (!cookieName.equals(cookie.getName())) {
+                cookiesInternal.add(cookie);
+            }
+        }
+        return cookiesInternal.toArray(new Cookie[0]);
+    }
+
+    @Override
+    public Cookie[] getCookies() {
+        return cookies;
+    }
+}
diff --git 
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/i18n/HaDispatchMessages.java
 
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/i18n/HaDispatchMessages.java
index c3a855c6d..201632321 100644
--- 
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/i18n/HaDispatchMessages.java
+++ 
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/i18n/HaDispatchMessages.java
@@ -22,6 +22,8 @@ import org.apache.knox.gateway.i18n.messages.MessageLevel;
 import org.apache.knox.gateway.i18n.messages.Messages;
 import org.apache.knox.gateway.i18n.messages.StackTrace;
 
+import java.net.URI;
+
 @Messages(logger = "org.apache.knox.gateway")
 public interface HaDispatchMessages {
   @Message(level = MessageLevel.INFO, text = "Initializing Ha Dispatch for: 
{0}")
@@ -56,4 +58,7 @@ public interface HaDispatchMessages {
 
   @Message(level = MessageLevel.ERROR, text = "Request is non-idempotent {0}, 
failover prevented, to allow non-idempotent requests to failover set 
'failoverNonIdempotentRequestEnabled=true' in HA config. Non connection related 
error: {1}")
   void cannotFailoverNonIdempotentRequest(String method, Throwable cause);
+
+  @Message( level = MessageLevel.DEBUG, text = "Dispatch request: {0} {1}" )
+  void dispatchRequest( String method, URI uri );
 }
diff --git 
a/gateway-provider-ha/src/test/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatchTest.java
 
b/gateway-provider-ha/src/test/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatchTest.java
index 77c34f559..d5cc9dadd 100644
--- 
a/gateway-provider-ha/src/test/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatchTest.java
+++ 
b/gateway-provider-ha/src/test/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatchTest.java
@@ -22,6 +22,7 @@ import org.apache.http.client.config.CookieSpecs;
 import org.apache.knox.gateway.config.GatewayConfig;
 import org.apache.knox.gateway.ha.provider.HaDescriptor;
 import org.apache.knox.gateway.ha.provider.HaProvider;
+import org.apache.knox.gateway.ha.provider.HaServiceConfig;
 import org.apache.knox.gateway.ha.provider.impl.DefaultHaProvider;
 import org.apache.knox.gateway.ha.provider.impl.HaDescriptorFactory;
 import org.apache.knox.gateway.services.GatewayServices;
@@ -32,8 +33,9 @@ import org.apache.knox.test.mock.MockServletContext;
 import org.apache.knox.test.mock.MockServletInputStream;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
+import org.junit.After;
 import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
 
 import javax.servlet.AsyncContext;
@@ -53,30 +55,40 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.createMock;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.replay;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class SSEHaDispatchTest {
 
-    private static MockServer MOCK_SSE_SERVER;
-    private static URI URL;
+    private MockServer MOCK_SSE_SERVER;
+    private URI URL;
+    private final String serviceName = "SSE";
 
-    @BeforeClass
-    public static void setUpBeforeClass() throws Exception {
+    @Before
+    public void setUpBeforeClass() throws Exception {
         MOCK_SSE_SERVER = new MockServer("SSE", true);
         URL = new URI("http://localhost:"; + MOCK_SSE_SERVER.getPort() + 
"/sse");
     }
 
+    @After
+    public void cleanUp() throws Exception {
+        if (MOCK_SSE_SERVER != null) {
+            MOCK_SSE_SERVER.stop();
+        }
+    }
+
     @Test
     public void testHADispatchURL() throws Exception {
         String serviceName = "SSE";
         HaDescriptor descriptor = HaDescriptorFactory.createDescriptor();
-        
descriptor.addServiceConfig(HaDescriptorFactory.createServiceConfig(serviceName,
 "true", null, null, null, null, "true", "true", null, null, null, null));
+        
descriptor.addServiceConfig(HaDescriptorFactory.createServiceConfig(serviceName,
 "true", "1", "1000", null, null, "true", "true", null, null, null, null));
         HaProvider provider = new DefaultHaProvider(descriptor);
         URI uri1 = new URI("http://host1.valid";);
         URI uri2 = new URI("http://host2.valid";);
@@ -130,7 +142,8 @@ public class SSEHaDispatchTest {
     @Test
     public void testStickySessionCookie() throws Exception {
         CountDownLatch latch = new CountDownLatch(1);
-        SSEHaDispatch sseHaDispatch = this.createDispatch(null);
+        HaServiceConfig haServiceConfig = 
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null, 
null, "true", "true", null, null, "agentX,user1,agentY", null);
+        SSEHaDispatch sseHaDispatch = this.createDispatch(false, 
haServiceConfig);
         PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
         HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_OK);
         AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
@@ -161,7 +174,8 @@ public class SSEHaDispatchTest {
     @Test
     public void testNamedStickySessionCookie() throws Exception {
         CountDownLatch latch = new CountDownLatch(1);
-        SSEHaDispatch sseHaDispatch = this.createDispatch("COOKIE_NAME");
+        HaServiceConfig haServiceConfig = 
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null, 
null, "true", "true", "COOKIE_NAME", null, "agentX,user1,agentY", null);
+        SSEHaDispatch sseHaDispatch = this.createDispatch(false, 
haServiceConfig);
         PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
         HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_OK);
         AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
@@ -192,7 +206,8 @@ public class SSEHaDispatchTest {
     @Test
     public void testLoadBalancingWithoutStickySessionCookie() throws Exception 
{
         CountDownLatch latch = new CountDownLatch(1);
-        SSEHaDispatch sseHaDispatch = this.createDispatch(null);
+        HaServiceConfig haServiceConfig = 
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null, 
null, "true", "true", null, null, "agentX,user1,agentY", null);
+        SSEHaDispatch sseHaDispatch = this.createDispatch(false, 
haServiceConfig);
         PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
         HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_OK);
         AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
@@ -217,13 +232,14 @@ public class SSEHaDispatchTest {
         latch.await(1L, TimeUnit.SECONDS);
         EasyMock.verify(asyncContext, outboundResponse, inboundRequest, 
printWriter);
         assertTrue(MOCK_SSE_SERVER.isEmpty());
-        assertEquals("http://host2:3333/sse";, 
sseHaDispatch.getHaProvider().getActiveURL("SSE"));
+        assertEquals("http://host2.valid:3333/sse";, 
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
     }
 
     @Test
     public void testLoadBalancingDisabledWithUserAgent() throws Exception {
         CountDownLatch latch = new CountDownLatch(1);
-        SSEHaDispatch sseHaDispatch = this.createDispatch(null);
+        HaServiceConfig haServiceConfig = 
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null, 
null, "true", "true", null, null, "agentX,user1,agentY", null);
+        SSEHaDispatch sseHaDispatch = this.createDispatch(false, 
haServiceConfig);
         PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
         HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_OK);
         AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
@@ -249,13 +265,14 @@ public class SSEHaDispatchTest {
         latch.await(1L, TimeUnit.SECONDS);
         EasyMock.verify(asyncContext, outboundResponse, inboundRequest, 
printWriter);
         assertTrue(MOCK_SSE_SERVER.isEmpty());
-        assertEquals("http://localhost:"; + MOCK_SSE_SERVER.getPort() + "/sse", 
sseHaDispatch.getHaProvider().getActiveURL("SSE"));
+        assertEquals("http://localhost:"; + MOCK_SSE_SERVER.getPort() + "/sse", 
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
     }
 
     @Test
     public void testLoadBalancingDisabledWithStickySession() throws Exception {
         CountDownLatch latch = new CountDownLatch(1);
-        SSEHaDispatch sseHaDispatch = this.createDispatch(null);
+        HaServiceConfig haServiceConfig = 
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null, 
null, "true", "true", null, null, "agentX,user1,agentY", null);
+        SSEHaDispatch sseHaDispatch = this.createDispatch(false, 
haServiceConfig);
         PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
         HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_OK);
         AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
@@ -277,7 +294,7 @@ public class SSEHaDispatchTest {
 
         sseHaDispatch.doGet(URL, inboundRequest, outboundResponse);
         latch.await(1L, TimeUnit.SECONDS);
-        assertEquals("http://host2:3333/sse";, 
sseHaDispatch.getHaProvider().getActiveURL("SSE"));
+        assertEquals("http://host2.valid:3333/sse";, 
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
 
 
         //Second request, sticky session cookie included
@@ -308,10 +325,357 @@ public class SSEHaDispatchTest {
 
         EasyMock.verify(asyncContext, outboundResponse, inboundRequest, 
printWriter, inboundRequest2);
         assertTrue(MOCK_SSE_SERVER.isEmpty());
-        assertEquals("http://host2:3333/sse";, 
sseHaDispatch.getHaProvider().getActiveURL("SSE"));
+        assertEquals("http://host2.valid:3333/sse";, 
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
+    }
+
+    @Test
+    public void testConnectivityFailover() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        HaServiceConfig haServiceConfig = 
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null, 
null, "true", "true", null, null, "agentX,user1,agentY", null);
+        SSEHaDispatch sseHaDispatch = this.createDispatch(true, 
haServiceConfig);
+        PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_OK);
+        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
+        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+        Capture<Cookie> capturedArgument = Capture.newInstance();
+        
expect(inboundRequest.getHeader("User-Agent")).andReturn("unknown").anyTimes();
+        expect(inboundRequest.getRequestURL()).andReturn(new 
StringBuffer(URL.toString())).once();
+        
expect(inboundRequest.getAttribute("dispatch.ha.failover.counter")).andReturn(new
 AtomicInteger(0)).once();
+        this.expectResponseBodyAndHeader(printWriter, outboundResponse, 
capturedArgument);
+        replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+        MOCK_SSE_SERVER.expect()
+                .method("GET")
+                .pathInfo("/sse")
+                .header("Accept", "text/event-stream")
+                .respond()
+                .status(HttpStatus.SC_OK)
+                
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
 StandardCharsets.UTF_8)
+                .header("response", "header")
+                .contentType("text/event-stream");
+        sseHaDispatch.doGet(new URI("http://unknown-host.invalid";), 
inboundRequest, outboundResponse);
+
+        latch.await(2L, TimeUnit.SECONDS);
+        EasyMock.verify(asyncContext, outboundResponse, inboundRequest, 
printWriter);
+        assertTrue(MOCK_SSE_SERVER.isEmpty());
+        assertEquals("http://host2.valid:3333/sse";, 
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
     }
 
-    private SSEHaDispatch createDispatch(String cookieName) throws Exception {
+    @Test
+    public void testNoLoadBalancingStickyFailoverNoFallback() throws Exception 
{
+        CountDownLatch latch = new CountDownLatch(1);
+        HaServiceConfig haServiceConfig = 
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null, 
null, "false", "true", null, "true", null, null);
+        SSEHaDispatch sseHaDispatch = this.createDispatch(true, 
haServiceConfig);
+        PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_OK);
+        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
+        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+        
expect(inboundRequest.getHeader("User-Agent")).andReturn("unknown").anyTimes();
+        expect(inboundRequest.getRequestURL()).andReturn(new 
StringBuffer(URL.toString())).once();
+        
expect(inboundRequest.getAttribute("dispatch.ha.failover.counter")).andReturn(new
 AtomicInteger(0)).once();
+        this.expectResponseBodyAndHeader(printWriter, outboundResponse, null);
+        replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+        MOCK_SSE_SERVER.expect()
+                .method("GET")
+                .pathInfo("/sse")
+                .header("Accept", "text/event-stream")
+                .respond()
+                .status(HttpStatus.SC_OK)
+                
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
 StandardCharsets.UTF_8)
+                .header("response", "header")
+                .contentType("text/event-stream");
+        sseHaDispatch.doGet(new URI("http://unknown-host.invalid";), 
inboundRequest, outboundResponse);
+
+        latch.await(2L, TimeUnit.SECONDS);
+        EasyMock.verify(asyncContext, outboundResponse, inboundRequest, 
printWriter);
+        assertTrue(MOCK_SSE_SERVER.isEmpty());
+        assertEquals(URL.toString(), 
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
+    }
+
+    @Test
+    public void testNoFallbackWhenStickyDisabled() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        HaServiceConfig haServiceConfig = 
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null, 
null, "false", "false", null, "true", null, null);
+        SSEHaDispatch sseHaDispatch = this.createDispatch(true, 
haServiceConfig);
+        PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_OK);
+        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
+        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+        
expect(inboundRequest.getHeader("User-Agent")).andReturn("unknown").anyTimes();
+        expect(inboundRequest.getRequestURL()).andReturn(new 
StringBuffer(URL.toString())).once();
+        
expect(inboundRequest.getAttribute("dispatch.ha.failover.counter")).andReturn(new
 AtomicInteger(0)).once();
+        this.expectResponseBodyAndHeader(printWriter, outboundResponse, null);
+        replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+        MOCK_SSE_SERVER.expect()
+                .method("GET")
+                .pathInfo("/sse")
+                .header("Accept", "text/event-stream")
+                .respond()
+                .status(HttpStatus.SC_OK)
+                
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
 StandardCharsets.UTF_8)
+                .header("response", "header")
+                .contentType("text/event-stream");
+        sseHaDispatch.doGet(new URI("http://unknown-host.invalid";), 
inboundRequest, outboundResponse);
+
+        latch.await(2L, TimeUnit.SECONDS);
+        EasyMock.verify(asyncContext, outboundResponse, inboundRequest, 
printWriter);
+        assertTrue(MOCK_SSE_SERVER.isEmpty());
+        assertEquals(URL.toString(), 
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
+    }
+
+    @Test
+    public void testMaxFailoverLimitReached() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        HaServiceConfig haServiceConfig = 
HaDescriptorFactory.createServiceConfig(serviceName, "true", "0", "1000", null, 
null, "true", "true", null, null, "agentX,user1,agentY", null);
+        SSEHaDispatch sseHaDispatch = this.createDispatch(true, 
haServiceConfig);
+        HttpServletResponse outboundResponse = this.getServletResponse(0);
+        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
+        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+        
expect(inboundRequest.getHeader("User-Agent")).andReturn("unknown").anyTimes();
+        
expect(inboundRequest.getAttribute("dispatch.ha.failover.counter")).andReturn(new
 AtomicInteger(0)).once();
+        outboundResponse.sendError(HttpServletResponse.SC_BAD_GATEWAY, 
"Service connection error, max failover attempts reached");
+        EasyMock.expectLastCall().once();
+        this.expectResponseBodyAndHeader(null, outboundResponse, null);
+        replay(inboundRequest, asyncContext, outboundResponse);
+
+        MOCK_SSE_SERVER.expect()
+                .method("GET")
+                .pathInfo("/sse")
+                .header("Accept", "text/event-stream")
+                .respond()
+                .status(HttpStatus.SC_OK)
+                
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
 StandardCharsets.UTF_8)
+                .header("response", "header")
+                .contentType("text/event-stream");
+        sseHaDispatch.doGet(new URI("http://unknown-host.invalid";), 
inboundRequest, outboundResponse);
+
+        latch.await(2L, TimeUnit.SECONDS);
+        EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+        assertFalse(MOCK_SSE_SERVER.isEmpty());
+        assertEquals(URL.toString(), 
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
+    }
+
+    @Test
+    public void testGETFailoverIdempotentDisabled() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        HaServiceConfig haServiceConfig = 
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null, 
null, "true", "true", null, null, "agentX,user1,agentY", "false");
+        SSEHaDispatch sseHaDispatch = this.createDispatch(true, 
haServiceConfig);
+        PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_OK);
+        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
+        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+        Capture<Cookie> capturedArgument = Capture.newInstance();
+        
expect(inboundRequest.getHeader("User-Agent")).andReturn("unknown").anyTimes();
+        expect(inboundRequest.getRequestURL()).andReturn(new 
StringBuffer(URL.toString())).once();
+        
expect(inboundRequest.getAttribute("dispatch.ha.failover.counter")).andReturn(new
 AtomicInteger(0)).once();
+        this.expectResponseBodyAndHeader(printWriter, outboundResponse, 
capturedArgument);
+        replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+        MOCK_SSE_SERVER.expect()
+                .method("GET")
+                .pathInfo("/sse")
+                .header("Accept", "text/event-stream")
+                .respond()
+                .status(HttpStatus.SC_OK)
+                
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
 StandardCharsets.UTF_8)
+                .header("response", "header")
+                .contentType("text/event-stream");
+        sseHaDispatch.doGet(new URI("http://unknown-host.invalid";), 
inboundRequest, outboundResponse);
+
+        latch.await(2L, TimeUnit.SECONDS);
+        EasyMock.verify(asyncContext, outboundResponse, inboundRequest, 
printWriter);
+        assertTrue(MOCK_SSE_SERVER.isEmpty());
+        assertEquals("http://host2.valid:3333/sse";, 
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
+    }
+
+    @Test
+    public void testPOSTFailoverIdempotentEnabled() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        HaServiceConfig haServiceConfig = 
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null, 
null, "true", "true", null, null, "agentX,user1,agentY", "true");
+        SSEHaDispatch sseHaDispatch = this.createDispatch(true, 
haServiceConfig);
+        PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_OK);
+        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
+        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+        Capture<Cookie> capturedArgument = Capture.newInstance();
+        
expect(inboundRequest.getHeader("User-Agent")).andReturn("unknown").anyTimes();
+        expect(inboundRequest.getRequestURL()).andReturn(new 
StringBuffer(URL.toString())).once();
+        
expect(inboundRequest.getAttribute("dispatch.ha.failover.counter")).andReturn(new
 AtomicInteger(0)).once();
+        this.expectResponseBodyAndHeader(printWriter, outboundResponse, 
capturedArgument);
+        replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+        MOCK_SSE_SERVER.expect()
+                .method("POST")
+                .pathInfo("/sse")
+                .header("Accept", "text/event-stream")
+                .respond()
+                .status(HttpStatus.SC_OK)
+                
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
 StandardCharsets.UTF_8)
+                .header("response", "header")
+                .contentType("text/event-stream");
+        sseHaDispatch.doPost(new URI("http://unknown-host.invalid";), 
inboundRequest, outboundResponse);
+
+        latch.await(2L, TimeUnit.SECONDS);
+        EasyMock.verify(asyncContext, outboundResponse, inboundRequest, 
printWriter);
+        assertTrue(MOCK_SSE_SERVER.isEmpty());
+        assertEquals("http://host2.valid:3333/sse";, 
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
+    }
+
+    @Test
+    public void testPOSTConnectErrorFailoverIdempotentDisabled() throws 
Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        HaServiceConfig haServiceConfig = 
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null, 
null, "true", "true", null, null, "agentX,user1,agentY", "false");
+        SSEHaDispatch sseHaDispatch = this.createDispatch(true, 
haServiceConfig);
+        PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_OK);
+        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
+        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+        Capture<Cookie> capturedArgument = Capture.newInstance();
+        
expect(inboundRequest.getHeader("User-Agent")).andReturn("unknown").anyTimes();
+        expect(inboundRequest.getRequestURL()).andReturn(new 
StringBuffer(URL.toString())).once();
+        
expect(inboundRequest.getAttribute("dispatch.ha.failover.counter")).andReturn(new
 AtomicInteger(0)).once();
+        this.expectResponseBodyAndHeader(printWriter, outboundResponse, 
capturedArgument);
+        replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+        MOCK_SSE_SERVER.expect()
+                .method("POST")
+                .pathInfo("/sse")
+                .header("Accept", "text/event-stream")
+                .respond()
+                .status(HttpStatus.SC_OK)
+                
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
 StandardCharsets.UTF_8)
+                .header("response", "header")
+                .contentType("text/event-stream");
+        sseHaDispatch.doPost(new URI("http://unknown-host.invalid";), 
inboundRequest, outboundResponse);
+
+        latch.await(2L, TimeUnit.SECONDS);
+        EasyMock.verify(asyncContext, outboundResponse, inboundRequest, 
printWriter);
+        assertTrue(MOCK_SSE_SERVER.isEmpty());
+        assertEquals("http://host2.valid:3333/sse";, 
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
+    }
+
+    @Test
+    public void testFailoverDisabledWithStickySession() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        HaServiceConfig haServiceConfig = 
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null, 
null, "true", "true", null, null, "agentX,user1,agentY", null);
+        SSEHaDispatch sseHaDispatch = this.createDispatch(true, 
haServiceConfig);
+        PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_OK);
+        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
+        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+        Capture<Cookie> capturedArgument = Capture.newInstance();
+        
expect(inboundRequest.getHeader("User-Agent")).andReturn("unknown").anyTimes();
+        expect(inboundRequest.getRequestURL()).andReturn(new 
StringBuffer(URL.toString())).once();
+        
expect(inboundRequest.getAttribute("dispatch.ha.failover.counter")).andReturn(new
 AtomicInteger(0)).once();
+        this.expectResponseBodyAndHeader(printWriter, outboundResponse, 
capturedArgument);
+        replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+        MOCK_SSE_SERVER.expect()
+                .method("GET")
+                .pathInfo("/sse")
+                .header("Accept", "text/event-stream")
+                .respond()
+                .status(HttpStatus.SC_OK)
+                
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
 StandardCharsets.UTF_8)
+                .header("response", "header")
+                .contentType("text/event-stream");
+        sseHaDispatch.doGet(new URI("http://unknown-host.invalid";), 
inboundRequest, outboundResponse);
+
+        latch.await(2L, TimeUnit.SECONDS);
+        EasyMock.verify(asyncContext, outboundResponse, inboundRequest, 
printWriter);
+        assertTrue(MOCK_SSE_SERVER.isEmpty());
+        assertEquals("http://host2.valid:3333/sse";, 
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
+
+        //Second request, sticky session cookie included
+        MOCK_SSE_SERVER.stop();
+        CountDownLatch latch2 = new CountDownLatch(1);
+        Cookie[] cookies = new Cookie[1];
+        cookies[0] = new Cookie(capturedArgument.getValue().getName(), 
capturedArgument.getValue().getValue());
+        PrintWriter printWriter2 = EasyMock.createNiceMock(PrintWriter.class);
+        HttpServletResponse outboundResponse2 = 
this.getServletResponse(HttpStatus.SC_OK);
+        AsyncContext asyncContext2 = this.getAsyncContext(latch2, 
outboundResponse2);
+        Capture<Cookie> capturedArgument2 = Capture.newInstance();
+        this.expectResponseBodyAndHeader(printWriter2, outboundResponse2, 
capturedArgument2);
+        HttpServletRequest inboundRequest2 = 
this.getHttpServletRequest(asyncContext2);
+        expect(inboundRequest2.getCookies()).andReturn(cookies).anyTimes();
+        replay(inboundRequest2, asyncContext2, outboundResponse2, 
printWriter2);
+
+        sseHaDispatch.doGet(URL, inboundRequest2, outboundResponse2);
+        latch2.await(1L, TimeUnit.SECONDS);
+
+        EasyMock.verify(asyncContext, outboundResponse, inboundRequest, 
printWriter, inboundRequest2);
+        assertEquals("http://host2.valid:3333/sse";, 
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
+    }
+
+    @Test
+    public void testFailoverWithUserAgentLBDisabled() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        HaServiceConfig haServiceConfig = 
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null, 
null, "true", "true", null, null, "agentX,user1,agentY", null);
+        SSEHaDispatch sseHaDispatch = this.createDispatch(true, 
haServiceConfig);
+        PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_OK);
+        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
+        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+        Capture<Cookie> capturedArgument = Capture.newInstance();
+        
expect(inboundRequest.getHeader("User-Agent")).andReturn("agentX").anyTimes();
+        expect(inboundRequest.getRequestURL()).andReturn(new 
StringBuffer(URL.toString())).once();
+        
expect(inboundRequest.getAttribute("dispatch.ha.failover.counter")).andReturn(new
 AtomicInteger(0)).once();
+        this.expectResponseBodyAndHeader(printWriter, outboundResponse, 
capturedArgument);
+        replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+        MOCK_SSE_SERVER.expect()
+                .method("GET")
+                .pathInfo("/sse")
+                .header("Accept", "text/event-stream")
+                .respond()
+                .status(HttpStatus.SC_OK)
+                
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
 StandardCharsets.UTF_8)
+                .header("response", "header")
+                .contentType("text/event-stream");
+        sseHaDispatch.doGet(new URI("http://unknown-host.invalid";), 
inboundRequest, outboundResponse);
+
+        latch.await(2L, TimeUnit.SECONDS);
+        EasyMock.verify(asyncContext, outboundResponse, inboundRequest, 
printWriter);
+        assertTrue(MOCK_SSE_SERVER.isEmpty());
+        assertEquals(URL.toString(), 
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
+    }
+
+    @Test
+    public void testNoFailoverAfterResponseIsCommitted() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        HaServiceConfig haServiceConfig = 
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null, 
null, "true", "true", null, null, "agentX,user1,agentY", null);
+        SSEHaDispatch sseHaDispatch = this.createDispatch(false, 
haServiceConfig);
+        PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_OK);
+        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
+        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+        Capture<Cookie> capturedArgument = Capture.newInstance();
+        
expect(inboundRequest.getHeader("User-Agent")).andReturn("unknown").anyTimes();
+        expect(outboundResponse.isCommitted()).andReturn(true).anyTimes();
+        this.expectResponseBodyAndHeaderWithError(printWriter, 
outboundResponse, capturedArgument);
+        replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+        MOCK_SSE_SERVER.expect()
+                .method("GET")
+                .pathInfo("/sse")
+                .header("Accept", "text/event-stream")
+                .respond()
+                .status(HttpStatus.SC_OK)
+                
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
 StandardCharsets.UTF_8)
+                .header("response", "header")
+                .contentType("text/event-stream");
+
+        sseHaDispatch.doGet(URL, inboundRequest, outboundResponse);
+
+        latch.await(2L, TimeUnit.SECONDS);
+        EasyMock.verify(asyncContext, outboundResponse, inboundRequest, 
printWriter);
+        assertTrue(MOCK_SSE_SERVER.isEmpty());
+        assertEquals("http://host2.valid:3333/sse";, 
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
+    }
+
+    private SSEHaDispatch createDispatch(boolean failoverNeeded, 
HaServiceConfig serviceConfig) throws Exception {
         KeystoreService keystoreService = createMock(KeystoreService.class);
         
expect(keystoreService.getTruststoreForHttpClient()).andReturn(null).once();
 
@@ -339,7 +703,7 @@ public class SSEHaDispatchTest {
         
expect(filterConfig.getInitParameter("serviceRole")).andReturn(null).once();
         
expect(filterConfig.getInitParameter("httpclient.cookieSpec")).andReturn(null).anyTimes();
 
-        HaProvider provider = this.createProvider(cookieName);
+        HaProvider provider = this.createProvider(failoverNeeded, 
serviceConfig);
 
         replay(keystoreService, gatewayConfig, gatewayServices, 
servletContext, filterConfig);
 
@@ -354,8 +718,10 @@ public class SSEHaDispatchTest {
     private HttpServletResponse getServletResponse(int statusCode) {
         HttpServletResponse outboundResponse = 
EasyMock.createNiceMock(HttpServletResponse.class);
 
-        outboundResponse.setStatus(statusCode);
-        EasyMock.expectLastCall();
+        if (statusCode != 0) {
+            outboundResponse.setStatus(statusCode);
+            EasyMock.expectLastCall();
+        }
 
         return outboundResponse;
     }
@@ -391,31 +757,54 @@ public class SSEHaDispatchTest {
         return inboundRequest;
     }
 
-    private void expectResponseBodyAndHeader(PrintWriter printWriter, 
HttpServletResponse outboundResponse, Capture<Cookie> capturedArgument) throws 
Exception {
-        outboundResponse.addCookie(capture(capturedArgument));
-        EasyMock.expectLastCall();
+    private void expectResponseBodyAndHeader(PrintWriter printWriter, 
HttpServletResponse outboundResponse,
+                                             Capture<Cookie> capturedArgument) 
throws Exception {
+        if (capturedArgument != null) {
+            outboundResponse.addCookie(capture(capturedArgument));
+            EasyMock.expectLastCall();
+        }
+
+        if (printWriter != null) {
+            
EasyMock.expect(outboundResponse.getWriter()).andReturn(printWriter).anyTimes();
+            printWriter.write("id:1\nevent:event1\ndata:data1");
+            EasyMock.expectLastCall();
+            
printWriter.write("id:2\nevent:event2\ndata:data2\nretry:1\n:testing");
+            EasyMock.expectLastCall();
+            printWriter.println('\n');
+            EasyMock.expectLastCall().times(2);
+        }
+    }
+
+    private void expectResponseBodyAndHeaderWithError(PrintWriter printWriter, 
HttpServletResponse outboundResponse,
+                                             Capture<Cookie> capturedArgument) 
throws Exception {
+        if (capturedArgument != null) {
+            outboundResponse.addCookie(capture(capturedArgument));
+            EasyMock.expectLastCall();
+        }
+
         
EasyMock.expect(outboundResponse.getWriter()).andReturn(printWriter).anyTimes();
         printWriter.write("id:1\nevent:event1\ndata:data1");
         EasyMock.expectLastCall();
         printWriter.write("id:2\nevent:event2\ndata:data2\nretry:1\n:testing");
-        EasyMock.expectLastCall();
-        printWriter.println('\n');
-        EasyMock.expectLastCall().times(2);
+        EasyMock.expectLastCall().andThrow(new 
IndexOutOfBoundsException("Custom exception"));
     }
 
-    private HaProvider createProvider(String cookieName) throws Exception {
-        String serviceName = "SSE";
+    private HaProvider createProvider(boolean failoverNeeded, HaServiceConfig 
serviceConfig) throws Exception {
         HaDescriptor descriptor = HaDescriptorFactory.createDescriptor();
-        
descriptor.addServiceConfig(HaDescriptorFactory.createServiceConfig(serviceName,
 "true", null, null, null, null, "true", "true", cookieName, null, 
"agentX,user1,agentY", null));
+        descriptor.addServiceConfig(serviceConfig);
         HaProvider provider = new DefaultHaProvider(descriptor);
         URI uri1 = new URI("http://localhost:"; + MOCK_SSE_SERVER.getPort() + 
"/sse");
-        URI uri2 = new URI("http://host2:3333/sse";);
-        URI uri3 = new URI("http://host3:3333/sse";);
+        URI uri2 = new URI("http://host2.valid:3333/sse";);
+        URI uri3 = new URI("http://host3.valid:3333/sse";);
         ArrayList<String> urlList = new ArrayList<>();
+        if (failoverNeeded) {
+            urlList.add(new URI("http://unknown-host.invalid";).toString());
+        }
         urlList.add(uri1.toString());
         urlList.add(uri2.toString());
         urlList.add(uri3.toString());
         provider.addHaService(serviceName, urlList);
         return provider;
     }
-}
\ No newline at end of file
+
+}
diff --git 
a/gateway-service-webhdfs/src/main/java/org/apache/knox/gateway/hdfs/dispatch/AbstractHdfsHaDispatch.java
 
b/gateway-service-webhdfs/src/main/java/org/apache/knox/gateway/hdfs/dispatch/AbstractHdfsHaDispatch.java
index b284ed4cd..506217887 100644
--- 
a/gateway-service-webhdfs/src/main/java/org/apache/knox/gateway/hdfs/dispatch/AbstractHdfsHaDispatch.java
+++ 
b/gateway-service-webhdfs/src/main/java/org/apache/knox/gateway/hdfs/dispatch/AbstractHdfsHaDispatch.java
@@ -61,7 +61,7 @@ public abstract class AbstractHdfsHaDispatch extends 
ConfigurableHADispatch {
          writeOutboundResponse(outboundRequest, inboundRequest, 
outboundResponse, inboundResponse);
       } catch (StandbyException | SafeModeException | IOException e) {
         /* if non-idempotent requests are not allowed to failover */
-        if(!failoverNonIdempotentRequestEnabled && 
nonIdempotentRequests.stream().anyMatch(outboundRequest.getMethod()::equalsIgnoreCase))
 {
+        if(!getHaConfigurations().isFailoverNonIdempotentRequestEnabled() && 
nonIdempotentRequests.stream().anyMatch(outboundRequest.getMethod()::equalsIgnoreCase))
 {
           LOG.cannotFailoverNonIdempotentRequest(outboundRequest.getMethod(), 
e.getCause());
           throw e;
         } else {
@@ -103,15 +103,15 @@ public abstract class AbstractHdfsHaDispatch extends 
ConfigurableHADispatch {
          counter = new AtomicInteger(0);
       }
       inboundRequest.setAttribute(FAILOVER_COUNTER_ATTRIBUTE, counter);
-      if (counter.incrementAndGet() <= maxFailoverAttempts) {
-         haProvider.markFailedURL(getResourceRole(), 
outboundRequest.getURI().toString());
+      if (counter.incrementAndGet() <= 
getHaConfigurations().getMaxFailoverAttempts()) {
+          
getHaConfigurations().getHaProvider().markFailedURL(getResourceRole(), 
outboundRequest.getURI().toString());
          //null out target url so that rewriters run again
          
inboundRequest.setAttribute(AbstractGatewayFilter.TARGET_REQUEST_URL_ATTRIBUTE_NAME,
 null);
          URI uri = getDispatchUrl(inboundRequest);
          ((HttpRequestBase) outboundRequest).setURI(uri);
-         if (failoverSleep > 0) {
+         if (getHaConfigurations().getFailoverSleep() > 0) {
             try {
-               Thread.sleep(failoverSleep);
+               Thread.sleep(getHaConfigurations().getFailoverSleep());
             } catch (InterruptedException e) {
                LOG.failoverSleepFailed(getResourceRole(), e);
                Thread.currentThread().interrupt();
@@ -120,7 +120,7 @@ public abstract class AbstractHdfsHaDispatch extends 
ConfigurableHADispatch {
          LOG.failingOverRequest(outboundRequest.getURI().toString());
          executeRequest(outboundRequest, inboundRequest, outboundResponse);
       } else {
-         LOG.maxFailoverAttemptsReached(maxFailoverAttempts, 
getResourceRole());
+         
LOG.maxFailoverAttemptsReached(getHaConfigurations().getMaxFailoverAttempts(), 
getResourceRole());
          if (inboundResponse != null) {
             writeOutboundResponse(outboundRequest, inboundRequest, 
outboundResponse, inboundResponse);
          } else {
diff --git a/gateway-spi/pom.xml b/gateway-spi/pom.xml
index d8f232e75..dc4c88aeb 100644
--- a/gateway-spi/pom.xml
+++ b/gateway-spi/pom.xml
@@ -58,16 +58,6 @@
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpasyncclient</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.httpcomponents</groupId>
-                    <artifactId>httpcore</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.httpcomponents</groupId>
-                    <artifactId>httpcore-nio</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
diff --git 
a/gateway-spi/src/main/java/org/apache/knox/gateway/SpiGatewayMessages.java 
b/gateway-spi/src/main/java/org/apache/knox/gateway/SpiGatewayMessages.java
index 1037bffc4..9992602b6 100644
--- a/gateway-spi/src/main/java/org/apache/knox/gateway/SpiGatewayMessages.java
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/SpiGatewayMessages.java
@@ -144,4 +144,7 @@ public interface SpiGatewayMessages {
 
   @Message(level = MessageLevel.INFO, text = "FIPS environment, configuring 
intercepting socket")
   void configureInterceptingSocket();
+
+  @Message( level = MessageLevel.ERROR, text = "Failed to send error to 
client" )
+  void failedToSendErrorToClient(@StackTrace(level=MessageLevel.ERROR) 
Exception e);
 }
diff --git 
a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/AsyncDispatch.java 
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/AsyncDispatch.java
new file mode 100644
index 000000000..c855501bd
--- /dev/null
+++ 
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/AsyncDispatch.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.dispatch;
+
+public interface AsyncDispatch {
+}
diff --git 
a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/ConfigurableDispatch.java
 
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/ConfigurableDispatch.java
index 6f1d240e5..d9a469783 100644
--- 
a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/ConfigurableDispatch.java
+++ 
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/ConfigurableDispatch.java
@@ -52,7 +52,7 @@ import java.util.stream.Collectors;
  *   make request/response exclude headers configurable
  *   make url encoding configurable
  */
-public class ConfigurableDispatch extends DefaultDispatch {
+public class ConfigurableDispatch extends DefaultDispatch implements 
SyncDispatch{
   private Set<String> requestExcludeHeaders = 
super.getOutboundRequestExcludeHeaders();
   private Set<String> responseExcludeHeaders = 
super.getOutboundResponseExcludeHeaders();
   private Map<String, String> requestAppendHeaders = Collections.emptyMap();
diff --git 
a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/SyncDispatch.java 
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/SyncDispatch.java
new file mode 100644
index 000000000..271ba0e85
--- /dev/null
+++ 
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/SyncDispatch.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.dispatch;
+
+public interface SyncDispatch {
+}
diff --git 
a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSECallback.java 
b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSECallback.java
new file mode 100644
index 000000000..6a9197091
--- /dev/null
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSECallback.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.knox.gateway.SpiGatewayMessages;
+import org.apache.knox.gateway.i18n.messages.MessagesFactory;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+
+public class SSECallback implements FutureCallback<SSEResponse> {
+
+    protected static final SpiGatewayMessages LOG = 
MessagesFactory.get(SpiGatewayMessages.class);
+    protected final AsyncContext asyncContext;
+    private final HttpAsyncRequestProducer producer;
+    protected final HttpServletResponse outboundResponse;
+
+
+    public SSECallback(HttpServletResponse outboundResponse, AsyncContext 
asyncContext, HttpAsyncRequestProducer producer) {
+        this.outboundResponse = outboundResponse;
+        this.asyncContext = asyncContext;
+        this.producer = producer;
+    }
+
+    @Override
+    public void completed(final SSEResponse response) {
+        releaseResources(true);
+        LOG.sseConnectionDone();
+    }
+
+    @Override
+    public void failed(final Exception ex) {
+        try {
+            if(!outboundResponse.isCommitted()) {
+                outboundResponse.sendError(HttpServletResponse.SC_BAD_GATEWAY, 
"Service connection error");
+            }
+        } catch (Exception e) {
+            LOG.failedToSendErrorToClient(e);
+        } finally {
+            releaseResources(true);
+            LOG.sseConnectionError(ex.getMessage());
+        }
+    }
+
+    @Override
+    public void cancelled() {
+        releaseResources(true);
+        LOG.sseConnectionCancelled();
+    }
+
+    protected void releaseResources(boolean releaseContext) {
+        try {
+            producer.close();
+        } catch (IOException e) {
+            LOG.sseProducerCloseError(e);
+        } finally {
+            if (releaseContext) {
+                asyncContext.complete();
+            }
+        }
+    }
+}
diff --git 
a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java 
b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java
index 767cbccef..532503df5 100644
--- a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java
@@ -25,7 +25,6 @@ import org.apache.http.client.methods.HttpPatch;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpPut;
 import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.concurrent.FutureCallback;
 import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
 import org.apache.http.nio.IOControl;
 import org.apache.http.nio.client.HttpAsyncClient;
@@ -36,6 +35,7 @@ import org.apache.http.protocol.HttpContext;
 import org.apache.knox.gateway.audit.api.Action;
 import org.apache.knox.gateway.audit.api.ActionOutcome;
 import org.apache.knox.gateway.audit.api.ResourceType;
+import org.apache.knox.gateway.dispatch.AsyncDispatch;
 import org.apache.knox.gateway.dispatch.ConfigurableDispatch;
 import org.apache.knox.gateway.dispatch.DefaultHttpAsyncClientFactory;
 import org.apache.knox.gateway.dispatch.HttpAsyncClientFactory;
@@ -50,9 +50,9 @@ import java.net.URISyntaxException;
 import java.nio.CharBuffer;
 import java.nio.charset.StandardCharsets;
 
-public class SSEDispatch extends ConfigurableDispatch {
+public class SSEDispatch extends ConfigurableDispatch implements AsyncDispatch 
{
 
-    private final HttpAsyncClient asyncClient;
+    protected final HttpAsyncClient asyncClient;
     private static final String TEXT_EVENT_STREAM_VALUE = "text/event-stream";
 
     public SSEDispatch(FilterConfig filterConfig) {
@@ -106,35 +106,16 @@ public class SSEDispatch extends ConfigurableDispatch {
         AsyncContext asyncContext = inboundRequest.startAsync();
         //No timeout
         asyncContext.setTimeout(0L);
+        this.executeAsyncRequest(outboundRequest, outboundResponse, 
asyncContext, inboundRequest);
+    }
 
+    protected void executeAsyncRequest(HttpUriRequest outboundRequest, 
HttpServletResponse outboundResponse,
+                                     AsyncContext asyncContext, 
HttpServletRequest inboundRequest) {
         HttpAsyncRequestProducer producer = 
HttpAsyncMethods.create(outboundRequest);
         AsyncCharConsumer<SSEResponse> consumer = new 
SSECharConsumer(outboundResponse, outboundRequest.getURI(), asyncContext, 
inboundRequest, outboundRequest);
-        this.executeAsyncRequest(producer, consumer, outboundRequest);
-    }
-
-    private void executeAsyncRequest(HttpAsyncRequestProducer producer, 
AsyncCharConsumer<SSEResponse> consumer, HttpUriRequest outboundRequest) {
         LOG.dispatchRequest(outboundRequest.getMethod(), 
outboundRequest.getURI());
         auditor.audit(Action.DISPATCH, outboundRequest.getURI().toString(), 
ResourceType.URI, ActionOutcome.UNAVAILABLE, 
RES.requestMethod(outboundRequest.getMethod()));
-        asyncClient.execute(producer, consumer, new 
FutureCallback<SSEResponse>() {
-
-            @Override
-            public void completed(final SSEResponse response) {
-                closeProducer(producer);
-                LOG.sseConnectionDone();
-            }
-
-            @Override
-            public void failed(final Exception ex) {
-                closeProducer(producer);
-                LOG.sseConnectionError(ex.getMessage());
-            }
-
-            @Override
-            public void cancelled() {
-                closeProducer(producer);
-                LOG.sseConnectionCancelled();
-            }
-        });
+        asyncClient.execute(producer, consumer, new 
SSECallback(outboundResponse, asyncContext, producer));
     }
 
     private void addAcceptHeader(HttpUriRequest outboundRequest) {
@@ -164,15 +145,11 @@ public class SSEDispatch extends ConfigurableDispatch {
         return (statusCode >= HttpStatus.SC_OK && statusCode < 300);
     }
 
-    private void closeProducer(HttpAsyncRequestProducer producer) {
-        try {
-            producer.close();
-        } catch (IOException e) {
-            LOG.sseProducerCloseError(e);
-        }
+    protected void shiftCallback(HttpUriRequest outboundRequest, 
HttpServletRequest inboundRequest) {
+        // No need to shift the URL for non-HA SSE requests
     }
 
-    private class SSECharConsumer extends AsyncCharConsumer<SSEResponse> {
+    protected class SSECharConsumer extends AsyncCharConsumer<SSEResponse> {
         private SSEResponse sseResponse;
         private final HttpServletResponse outboundResponse;
         private final HttpUriRequest outboundRequest;
@@ -180,7 +157,7 @@ public class SSEDispatch extends ConfigurableDispatch {
         private final URI url;
         private final AsyncContext asyncContext;
 
-        SSECharConsumer(HttpServletResponse outboundResponse, URI url, 
AsyncContext asyncContext, HttpServletRequest inboundRequest, HttpUriRequest 
outboundRequest) {
+        public SSECharConsumer(HttpServletResponse outboundResponse, URI url, 
AsyncContext asyncContext, HttpServletRequest inboundRequest, HttpUriRequest 
outboundRequest) {
             this.outboundResponse = outboundResponse;
             this.outboundRequest = outboundRequest;
             this.inboundRequest = inboundRequest;
@@ -197,6 +174,7 @@ public class SSEDispatch extends ConfigurableDispatch {
             } else {
                 handleErrorResponse(outboundResponse, url, inboundResponse);
             }
+            shiftCallback(outboundRequest, inboundRequest);
         }
 
         @Override
@@ -211,11 +189,6 @@ public class SSEDispatch extends ConfigurableDispatch {
             }
         }
 
-        @Override
-        protected void releaseResources() {
-            this.asyncContext.complete();
-        }
-
         @Override
         protected SSEResponse buildResult(final HttpContext context) {
             return this.sseResponse;
diff --git 
a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java 
b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
index e8ed44ebf..e6fac3cb4 100644
--- a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
+++ b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
@@ -386,6 +386,8 @@ public class SSEDispatchTest {
         CountDownLatch latch = new CountDownLatch(1);
         SSEDispatch sseDispatch = this.createDispatch();
         HttpServletResponse outboundResponse = 
EasyMock.createNiceMock(HttpServletResponse.class);
+        outboundResponse.sendError(HttpServletResponse.SC_BAD_GATEWAY, 
"Service connection error");
+        EasyMock.expectLastCall().once();
         AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
         HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
 
diff --git a/pom.xml b/pom.xml
index 6a1422a2c..fe1d9c136 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1597,6 +1597,16 @@
                 <groupId>org.apache.httpcomponents</groupId>
                 <artifactId>httpasyncclient</artifactId>
                 <version>${asynchttpclient.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.apache.httpcomponents</groupId>
+                        <artifactId>httpcore</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.httpcomponents</groupId>
+                        <artifactId>httpcore-nio</artifactId>
+                    </exclusion>
+                </exclusions>
             </dependency>
             <dependency>
                 <groupId>org.apache.httpcomponents</groupId>

Reply via email to