This is an automated email from the ASF dual-hosted git repository.

smolnar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git


The following commit(s) were added to refs/heads/master by this push:
     new df9e7912c KNOX-3115: HA Dispatch for SSE (#1010)
df9e7912c is described below

commit df9e7912c00f6a840e51b7526c035f47fc14e777
Author: hanicz <[email protected]>
AuthorDate: Tue Apr 8 16:18:03 2025 +0200

    KNOX-3115: HA Dispatch for SSE (#1010)
---
 .../ha/dispatch/ConfigurableHADispatch.java        | 240 +++--------
 .../knox/gateway/ha/dispatch/LBHaDispatch.java     | 243 +++++++++++
 .../knox/gateway/ha/dispatch/SSEHaDispatch.java    | 152 +++++++
 .../gateway/ha/dispatch/SSEHaDispatchTest.java     | 460 +++++++++------------
 .../org/apache/knox/gateway/sse/SSEDispatch.java   |  14 +-
 .../org/apache/knox/gateway/sse/SSEEntity.java     |  39 +-
 .../java/org/apache/knox/gateway/sse/SSEvent.java  |  86 ++--
 .../apache/knox/gateway/sse/SSEDispatchTest.java   |   2 +-
 .../org/apache/knox/gateway/sse/SSEEntityTest.java |  25 +-
 .../org/apache/knox/gateway/sse/SSEventTest.java   |  18 +-
 10 files changed, 751 insertions(+), 528 deletions(-)

diff --git 
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/ConfigurableHADispatch.java
 
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/ConfigurableHADispatch.java
index 0f2bb23cb..0cb6b35b0 100644
--- 
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/ConfigurableHADispatch.java
+++ 
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/ConfigurableHADispatch.java
@@ -17,14 +17,9 @@
  */
 package org.apache.knox.gateway.ha.dispatch;
 
-import static org.apache.knox.gateway.util.HttpUtils.isConnectionError;
-
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.methods.HttpRequestBase;
 import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.client.utils.URIBuilder;
 import org.apache.knox.gateway.config.Configure;
 import org.apache.knox.gateway.config.GatewayConfig;
 import org.apache.knox.gateway.dispatch.ConfigurableDispatch;
@@ -41,23 +36,20 @@ import javax.servlet.http.HttpServletRequestWrapper;
 import javax.servlet.http.HttpServletResponse;
 import java.io.IOException;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
+
+import static org.apache.knox.gateway.util.HttpUtils.isConnectionError;
 
 /**
  * A configurable HA dispatch class that has a very basic failover mechanism 
and
  * configurable options of ConfigurableDispatch class.
  */
-public class ConfigurableHADispatch extends ConfigurableDispatch {
+public class ConfigurableHADispatch extends ConfigurableDispatch implements 
LBHaDispatch {
 
   protected static final String FAILOVER_COUNTER_ATTRIBUTE = 
"dispatch.ha.failover.counter";
 
@@ -69,8 +61,6 @@ public class ConfigurableHADispatch extends 
ConfigurableDispatch {
 
   protected HaProvider haProvider;
 
-  private static final Map<String, String> urlToHashLookup = new HashMap<>();
-  private static final Map<String, String> hashToUrlLookup = new HashMap<>();
   protected static final List<String> nonIdempotentRequests = 
Arrays.asList("POST", "PATCH", "CONNECT");
 
   private boolean loadBalancingEnabled = 
HaServiceConfigConstants.DEFAULT_LOAD_BALANCING_ENABLED;
@@ -100,40 +90,13 @@ public class ConfigurableHADispatch extends 
ConfigurableDispatch {
       HaServiceConfig serviceConfig = 
haProvider.getHaDescriptor().getServiceConfig(getServiceRole());
       maxFailoverAttempts = serviceConfig.getMaxFailoverAttempts();
       failoverSleep = serviceConfig.getFailoverSleep();
-      loadBalancingEnabled = serviceConfig.isLoadBalancingEnabled();
       failoverNonIdempotentRequestEnabled = 
serviceConfig.isFailoverNonIdempotentRequestEnabled();
-
-      /* enforce dependency */
-      stickySessionsEnabled = loadBalancingEnabled && 
serviceConfig.isStickySessionEnabled();
+      initializeLBHaDispatch(serviceConfig);
       noFallbackEnabled = stickySessionsEnabled && 
serviceConfig.isNoFallbackEnabled();
-      if(stickySessionsEnabled) {
-        stickySessionCookieName = serviceConfig.getStickySessionCookieName();
-      }
-
-      
if(StringUtils.isNotBlank(serviceConfig.getStickySessionDisabledUserAgents())) {
-        disableLoadBalancingForUserAgents = 
Arrays.asList(serviceConfig.getStickySessionDisabledUserAgents()
-            .trim()
-            .split("\\s*,\\s*"));
-      }
-      setupUrlHashLookup();
-    }
-
-    /* setup the active URL for non-LB case */
-    activeURL.set(haProvider.getActiveURL(getServiceRole()));
-
-    // Suffix the cookie name by the service to make it unique
-    // The cookie path is NOT unique since Knox is stripping the service name.
-    stickySessionCookieName = stickySessionCookieName + '-' + getServiceRole();
-  }
-
-  private void setupUrlHashLookup() {
-    for (String url : haProvider.getURLs(getServiceRole())) {
-        String urlHash = hash(url);
-        urlToHashLookup.put(url, urlHash);
-        hashToUrlLookup.put(urlHash, url);
     }
   }
 
+  @Override
   public HaProvider getHaProvider() {
     return haProvider;
   }
@@ -144,70 +107,71 @@ public class ConfigurableHADispatch extends 
ConfigurableDispatch {
   }
 
   @Override
-  protected void executeRequestWrapper(HttpUriRequest outboundRequest,
-          HttpServletRequest inboundRequest, HttpServletResponse 
outboundResponse)
-          throws IOException {
+  public boolean isStickySessionEnabled() {
+      return stickySessionsEnabled;
+  }
 
-      final String userAgentFromBrowser = 
StringUtils.isBlank(inboundRequest.getHeader("User-Agent")) ? "" : 
inboundRequest.getHeader("User-Agent");
+  @Override
+  public void setStickySessionsEnabled(boolean enabled) {
+    this.stickySessionsEnabled = enabled;
+  }
 
-      /* disable loadblancing override */
-      boolean userAgentDisabled = false;
+  @Override
+  public String getStickySessionCookieName() {
+      return stickySessionCookieName;
+  }
 
-      /* disable loadbalancing in case a configured user agent is detected to 
disable LB */
-      if(disableLoadBalancingForUserAgents.stream().anyMatch(c -> 
userAgentFromBrowser.contains(c))  ) {
-        userAgentDisabled = true;
-        LOG.disableHALoadbalancinguserAgent(userAgentFromBrowser, 
disableLoadBalancingForUserAgents.toString());
-      }
+  @Override
+  public void setStickySessionCookieName(String stickySessionCookieName) {
+    this.stickySessionCookieName = stickySessionCookieName;
+  }
 
-      /* if disable LB is set don't bother setting backend from cookie */
-      Optional<URI> backendURI = Optional.empty();
-      if(!userAgentDisabled) {
-        backendURI = setBackendfromHaCookie(outboundRequest, inboundRequest);
-        if(backendURI.isPresent()) {
-          ((HttpRequestBase) outboundRequest).setURI(backendURI.get());
-        }
-      }
+  @Override
+  public boolean isLoadBalancingEnabled() {
+      return loadBalancingEnabled;
+  }
 
-      /**
-       * case where loadbalancing is enabled
-       * and we have a HTTP request configured not to use LB
-       * use the activeURL
-      */
-      if(loadBalancingEnabled && userAgentDisabled) {
-        try {
-          ((HttpRequestBase) 
outboundRequest).setURI(updateHostURL(outboundRequest.getURI(), 
activeURL.get()));
-        } catch (final URISyntaxException e) {
-          LOG.errorSettingActiveUrl();
-        }
-      }
+  @Override
+  public void setLoadBalancingEnabled(boolean enabled) {
+    this.loadBalancingEnabled = enabled;
+  }
+
+  @Override
+  public List<String> getDisableLoadBalancingForUserAgents() {
+      return disableLoadBalancingForUserAgents;
+  }
 
+  @Override
+  public void setDisableLoadBalancingForUserAgents(List<String> 
disableLoadBalancingForUserAgents) {
+    this.disableLoadBalancingForUserAgents = disableLoadBalancingForUserAgents;
+  }
+
+  @Override
+  public AtomicReference<String> getActiveURL() {
+      return activeURL;
+  }
+
+  @Override
+  public void setActiveURL(String url) {
+    activeURL.set(url);
+  }
+
+  @Override
+  protected void executeRequestWrapper(HttpUriRequest outboundRequest, 
HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws 
IOException {
+      boolean userAgentDisabled = isUserAgentDisabled(inboundRequest);
+      Optional<URI> backendURI = setBackendUri(outboundRequest, 
inboundRequest, userAgentDisabled);
       executeRequest(outboundRequest, inboundRequest, outboundResponse);
-      /**
-       * 1. Load balance when loadbalancing is enabled and there are no 
overrides (disableLB)
-       * 2. Loadbalance only when sticky session is enabled but cookie not 
detected
-       *    i.e. when loadbalancing is enabled every request that does not 
have BACKEND cookie
-       *    needs to be loadbalanced. If a request has BACKEND coookie and 
Loadbalance=on then
-       *    there should be no loadbalancing.
-       */
-      if (loadBalancingEnabled && !userAgentDisabled) {
-        /* check sticky session enabled */
-        if(stickySessionsEnabled) {
-          /* loadbalance only when sticky session enabled and no backend url 
cookie */
-          if(!backendURI.isPresent()) {
-            haProvider.makeNextActiveURLAvailable(getServiceRole());
-          } else{
-            /* sticky session enabled and backend url cookie is valid no need 
to loadbalance */
-            /* do nothing */
-          }
-        } else {
-          haProvider.makeNextActiveURLAvailable(getServiceRole());
-        }
-      }
+      shiftActiveURL(userAgentDisabled, backendURI);
   }
 
   @Override
   protected void outboundResponseWrapper(final HttpUriRequest outboundRequest, 
final HttpServletRequest inboundRequest, final HttpServletResponse 
outboundResponse) {
-      setKnoxHaCookie(outboundRequest, inboundRequest, outboundResponse);
+      GatewayConfig config = (GatewayConfig) inboundRequest
+              .getServletContext()
+              .getAttribute(GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE);
+      boolean sslEnabled = config != null && config.isSSLEnabled();
+
+      setKnoxHaCookie(outboundRequest, inboundRequest, outboundResponse, 
sslEnabled);
   }
 
   @Override
@@ -235,71 +199,6 @@ public class ConfigurableHADispatch extends 
ConfigurableDispatch {
     return !failoverNonIdempotentRequestEnabled && 
nonIdempotentRequests.stream().anyMatch(outboundRequest.getMethod()::equalsIgnoreCase);
   }
 
-  private Optional<URI> setBackendfromHaCookie(HttpUriRequest outboundRequest, 
HttpServletRequest inboundRequest) {
-      if (loadBalancingEnabled && stickySessionsEnabled && 
inboundRequest.getCookies() != null) {
-          for (Cookie cookie : inboundRequest.getCookies()) {
-              if (stickySessionCookieName.equals(cookie.getName())) {
-                  String backendURLHash = cookie.getValue();
-                  String backendURL = hashToUrlLookup.get(backendURLHash);
-                  // Make sure that the url provided is actually a valid 
backend url
-                  if 
(haProvider.getURLs(getServiceRole()).contains(backendURL)) {
-                      try {
-                        return 
Optional.of(updateHostURL(outboundRequest.getURI(), backendURL));
-                      } catch (URISyntaxException ignore) {
-                          // The cookie was invalid so we just don't set it. 
Knox will pick a backend automatically
-                      }
-                  }
-              }
-          }
-      }
-      return Optional.empty();
-  }
-
-  private void setKnoxHaCookie(final HttpUriRequest outboundRequest, final 
HttpServletRequest inboundRequest,
-          final HttpServletResponse outboundResponse) {
-      if (stickySessionsEnabled) {
-          List<Cookie> serviceHaCookies = Collections.emptyList();
-          if(inboundRequest.getCookies() != null) {
-              serviceHaCookies = Arrays
-                      .stream(inboundRequest.getCookies())
-                      .filter(cookie -> 
stickySessionCookieName.equals(cookie.getName()))
-                      .collect(Collectors.toList());
-          }
-          /* if the inbound request has a valid hash then no need to set a 
different hash */
-          if (serviceHaCookies != null && !serviceHaCookies.isEmpty()
-                  && 
hashToUrlLookup.containsKey(serviceHaCookies.get(0).getValue())) {
-              return;
-          } else {
-
-              /**
-              * Due to concurrency issues haProvider.getActiveURL() will not 
return the accurate list
-              * This will cause issues where original request goes to host-1 
and cookie is set for host-2 - because
-              * haProvider.getActiveURL() returned host-2. To prevent this 
from happening we need to make sure
-              * we set cookie for the endpoint that was served and not rely on 
haProvider.getActiveURL().
-              * let LBing logic take care of rotating urls.
-              **/
-              final List<String> urls = haProvider.getURLs(getServiceRole())
-                      .stream()
-                      .filter(u -> 
u.contains(outboundRequest.getURI().getHost()))
-                      .collect(Collectors.toList());
-
-              final String cookieValue = urlToHashLookup.get(urls.get(0));
-
-              Cookie stickySessionCookie = new Cookie(stickySessionCookieName, 
cookieValue);
-              stickySessionCookie.setPath(inboundRequest.getContextPath());
-              stickySessionCookie.setMaxAge(-1);
-              stickySessionCookie.setHttpOnly(true);
-              GatewayConfig config = (GatewayConfig) inboundRequest
-                      .getServletContext()
-                      .getAttribute(GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE);
-              if (config != null) {
-                  stickySessionCookie.setSecure(config.isSSLEnabled());
-              }
-              outboundResponse.addCookie(stickySessionCookie);
-          }
-      }
-  }
-
   protected void failoverRequest(HttpUriRequest outboundRequest, 
HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, 
HttpResponse inboundResponse, Exception exception) throws IOException {
     // Check whether the session cookie is present
     Optional<Cookie> sessionCookie = Optional.empty();
@@ -370,10 +269,6 @@ public class ConfigurableHADispatch extends 
ConfigurableDispatch {
     return counter;
   }
 
-  private String hash(String url) {
-    return DigestUtils.sha256Hex(url);
-  }
-
   /**
    * Strips out the cookies by the cookie name provided
    */
@@ -403,21 +298,4 @@ public class ConfigurableHADispatch extends 
ConfigurableDispatch {
       return cookies;
     }
   }
-
-  /**
-   * A helper function that updates the schema, host and port
-   * of the URI with the provided string URL and returnes a new
-   * URI object
-   * @param source
-   * @param host
-   * @return
-   */
-  private URI updateHostURL(final URI source, final String host) throws 
URISyntaxException {
-    final URI newUri = new URI(host);
-    final URIBuilder uriBuilder = new URIBuilder(source);
-    uriBuilder.setScheme(newUri.getScheme());
-    uriBuilder.setHost(newUri.getHost());
-    uriBuilder.setPort(newUri.getPort());
-    return uriBuilder.build();
-  }
 }
diff --git 
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/LBHaDispatch.java
 
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/LBHaDispatch.java
new file mode 100644
index 000000000..433e9bfe5
--- /dev/null
+++ 
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/LBHaDispatch.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.ha.dispatch;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.knox.gateway.ha.dispatch.i18n.HaDispatchMessages;
+import org.apache.knox.gateway.ha.provider.HaProvider;
+import org.apache.knox.gateway.ha.provider.HaServiceConfig;
+import org.apache.knox.gateway.i18n.messages.MessagesFactory;
+
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+public interface LBHaDispatch {
+
+    HaDispatchMessages LOG = MessagesFactory.get(HaDispatchMessages.class);
+    Map<String, String> urlToHashLookup = new HashMap<>();
+    Map<String, String> hashToUrlLookup = new HashMap<>();
+
+    boolean isStickySessionEnabled();
+
+    void setStickySessionsEnabled(boolean enabled);
+
+    String getStickySessionCookieName();
+
+    void setStickySessionCookieName(String stickySessionCookieName);
+
+    HaProvider getHaProvider();
+
+    String getServiceRole();
+
+    void setLoadBalancingEnabled(boolean enabled);
+
+    boolean isLoadBalancingEnabled();
+
+    List<String> getDisableLoadBalancingForUserAgents();
+
+    void setDisableLoadBalancingForUserAgents(List<String> 
disableLoadBalancingForUserAgents);
+
+    AtomicReference<String> getActiveURL();
+
+    void setActiveURL(String url);
+
+    default void initializeLBHaDispatch(HaServiceConfig serviceConfig) {
+        setLoadBalancingEnabled(serviceConfig.isLoadBalancingEnabled());
+        setStickySessionsEnabled(isLoadBalancingEnabled() && 
serviceConfig.isStickySessionEnabled());
+
+        if (isStickySessionEnabled()) {
+            
setStickySessionCookieName(serviceConfig.getStickySessionCookieName());
+        }
+
+        if 
(StringUtils.isNotBlank(serviceConfig.getStickySessionDisabledUserAgents())) {
+            
setDisableLoadBalancingForUserAgents(Arrays.asList(serviceConfig.getStickySessionDisabledUserAgents()
+                    .trim()
+                    .split("\\s*,\\s*")));
+        }
+        setupUrlHashLookup();
+
+        /* setup the active URL for non-LB case */
+        setActiveURL(getHaProvider().getActiveURL(getServiceRole()));
+
+        // Suffix the cookie name by the service to make it unique
+        // The cookie path is NOT unique since Knox is stripping the service 
name.
+        setStickySessionCookieName(getStickySessionCookieName() + '-' + 
getServiceRole());
+    }
+
+    default void setKnoxHaCookie(final HttpUriRequest outboundRequest, final 
HttpServletRequest inboundRequest,
+                                 final HttpServletResponse outboundResponse, 
boolean sslEnabled) {
+        if (isStickySessionEnabled()) {
+            List<Cookie> serviceHaCookies = Collections.emptyList();
+            if (inboundRequest.getCookies() != null) {
+                serviceHaCookies = Arrays
+                        .stream(inboundRequest.getCookies())
+                        .filter(cookie -> 
getStickySessionCookieName().equals(cookie.getName()))
+                        .collect(Collectors.toList());
+            }
+
+            /* if the inbound request has a valid hash then no need to set a 
different hash */
+            if (serviceHaCookies.isEmpty() || 
!hashToUrlLookup.containsKey(serviceHaCookies.get(0).getValue())) {
+                /**
+                 * Due to concurrency issues haProvider.getActiveURL() will 
not return the accurate list
+                 * This will cause issues where original request goes to 
host-1 and cookie is set for host-2 - because
+                 * haProvider.getActiveURL() returned host-2. To prevent this 
from happening we need to make sure
+                 * we set cookie for the endpoint that was served and not rely 
on haProvider.getActiveURL().
+                 * let LBing logic take care of rotating urls.
+                 **/
+                final List<String> urls = 
getHaProvider().getURLs(getServiceRole())
+                        .stream()
+                        .filter(u -> 
u.contains(outboundRequest.getURI().getHost()))
+                        .collect(Collectors.toList());
+                final String cookieValue = urlToHashLookup.get(urls.get(0));
+                Cookie stickySessionCookie = new 
Cookie(getStickySessionCookieName(), cookieValue);
+                stickySessionCookie.setPath(inboundRequest.getContextPath());
+                stickySessionCookie.setMaxAge(-1);
+                stickySessionCookie.setHttpOnly(true);
+                stickySessionCookie.setSecure(sslEnabled);
+                outboundResponse.addCookie(stickySessionCookie);
+            }
+        }
+    }
+
+    default Optional<URI> setBackendFromHaCookie(HttpUriRequest 
outboundRequest, HttpServletRequest inboundRequest) {
+        if (isLoadBalancingEnabled() && isStickySessionEnabled() && 
inboundRequest.getCookies() != null) {
+            for (Cookie cookie : inboundRequest.getCookies()) {
+                if (getStickySessionCookieName().equals(cookie.getName())) {
+                    String backendURLHash = cookie.getValue();
+                    String backendURL = hashToUrlLookup.get(backendURLHash);
+                    // Make sure that the url provided is actually a valid 
backend url
+                    if 
(getHaProvider().getURLs(getServiceRole()).contains(backendURL)) {
+                        try {
+                            return 
Optional.of(updateHostURL(outboundRequest.getURI(), backendURL));
+                        } catch (URISyntaxException ignore) {
+                            // The cookie was invalid so we just don't set it. 
Knox will pick a backend automatically
+                        }
+                    }
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    default String hash(String url) {
+        return DigestUtils.sha256Hex(url);
+    }
+
+    /**
+     * A helper function that updates the schema, host and port
+     * of the URI with the provided string URL and returnes a new
+     * URI object
+     *
+     * @param source
+     * @param host
+     * @return
+     */
+    default URI updateHostURL(final URI source, final String host) throws 
URISyntaxException {
+        final URI newUri = new URI(host);
+        final URIBuilder uriBuilder = new URIBuilder(source);
+        uriBuilder.setScheme(newUri.getScheme());
+        uriBuilder.setHost(newUri.getHost());
+        uriBuilder.setPort(newUri.getPort());
+        return uriBuilder.build();
+    }
+
+    default void setupUrlHashLookup() {
+        for (String url : getHaProvider().getURLs(getServiceRole())) {
+            String urlHash = hash(url);
+            urlToHashLookup.put(url, urlHash);
+            hashToUrlLookup.put(urlHash, url);
+        }
+    }
+
+    default boolean isUserAgentDisabled(HttpServletRequest inboundRequest) {
+        final String userAgentFromBrowser = 
StringUtils.isBlank(inboundRequest.getHeader("User-Agent")) ? "" : 
inboundRequest.getHeader("User-Agent");
+        /* disable loadblancing override */
+        boolean userAgentDisabled = false;
+
+        /* disable loadbalancing in case a configured user agent is detected 
to disable LB */
+        if 
(getDisableLoadBalancingForUserAgents().stream().anyMatch(userAgentFromBrowser::contains))
 {
+            userAgentDisabled = true;
+            LOG.disableHALoadbalancinguserAgent(userAgentFromBrowser, 
getDisableLoadBalancingForUserAgents().toString());
+        }
+
+        return userAgentDisabled;
+    }
+
+    default Optional<URI> setBackendUri(HttpUriRequest outboundRequest, 
HttpServletRequest inboundRequest, boolean userAgentDisabled) {
+        /* if disable LB is set don't bother setting backend from cookie */
+        Optional<URI> backendURI = Optional.empty();
+        if (!userAgentDisabled) {
+            backendURI = setBackendFromHaCookie(outboundRequest, 
inboundRequest);
+            backendURI.ifPresent(uri -> ((HttpRequestBase) 
outboundRequest).setURI(uri));
+        }
+
+        /**
+         * case where loadbalancing is enabled
+         * and we have a HTTP request configured not to use LB
+         * use the activeURL
+         */
+        if (isLoadBalancingEnabled() && userAgentDisabled) {
+            try {
+                ((HttpRequestBase) 
outboundRequest).setURI(updateHostURL(outboundRequest.getURI(), 
getActiveURL().get()));
+            } catch (final URISyntaxException e) {
+                LOG.errorSettingActiveUrl();
+            }
+        }
+
+        return backendURI;
+    }
+
+    default void shiftActiveURL(boolean userAgentDisabled, Optional<URI> 
backendURI) {
+        /**
+         * 1. Load balance when loadbalancing is enabled and there are no 
overrides (disableLB)
+         * 2. Loadbalance only when sticky session is enabled but cookie not 
detected
+         *    i.e. when loadbalancing is enabled every request that does not 
have BACKEND cookie
+         *    needs to be loadbalanced. If a request has BACKEND coookie and 
Loadbalance=on then
+         *    there should be no loadbalancing.
+         */
+        if (isLoadBalancingEnabled() && !userAgentDisabled) {
+            /* check sticky session enabled */
+            if (isStickySessionEnabled()) {
+                /* loadbalance only when sticky session enabled and no backend 
url cookie */
+                if (!backendURI.isPresent()) {
+                    
getHaProvider().makeNextActiveURLAvailable(getServiceRole());
+                } else {
+                    /* sticky session enabled and backend url cookie is valid 
no need to loadbalance */
+                    /* do nothing */
+                }
+            } else {
+                getHaProvider().makeNextActiveURLAvailable(getServiceRole());
+            }
+        }
+    }
+}
diff --git 
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatch.java
 
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatch.java
new file mode 100644
index 000000000..50582c2b9
--- /dev/null
+++ 
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatch.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.ha.dispatch;
+
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.knox.gateway.config.Configure;
+import org.apache.knox.gateway.config.GatewayConfig;
+import org.apache.knox.gateway.ha.dispatch.i18n.HaDispatchMessages;
+import org.apache.knox.gateway.ha.provider.HaProvider;
+import org.apache.knox.gateway.ha.provider.HaServiceConfig;
+import org.apache.knox.gateway.ha.provider.impl.HaServiceConfigConstants;
+import org.apache.knox.gateway.i18n.messages.MessagesFactory;
+import org.apache.knox.gateway.sse.SSEDispatch;
+
+import javax.servlet.FilterConfig;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class SSEHaDispatch extends SSEDispatch implements LBHaDispatch {
+
+    protected static final HaDispatchMessages LOG = 
MessagesFactory.get(HaDispatchMessages.class);
+    protected HaProvider haProvider;
+    private boolean loadBalancingEnabled = 
HaServiceConfigConstants.DEFAULT_LOAD_BALANCING_ENABLED;
+    private boolean stickySessionsEnabled = 
HaServiceConfigConstants.DEFAULT_STICKY_SESSIONS_ENABLED;
+    private String stickySessionCookieName = 
HaServiceConfigConstants.DEFAULT_STICKY_SESSION_COOKIE_NAME;
+    private List<String> disableLoadBalancingForUserAgents = 
Collections.singletonList(HaServiceConfigConstants.DEFAULT_DISABLE_LB_USER_AGENTS);
+    private final boolean sslEnabled;
+
+    /**
+     * This activeURL is used to track urls when LB is turned off for some 
clients.
+     * The problem we have with selectively turning off LB is that other 
clients
+     * that use LB can change the state from under the current session where 
LB is
+     * turned off.
+     * e.g.
+     * ODBC Connection established where LB is off. JDBC connection is 
established
+     * next where LB is enabled. This changes the active URL under the 
existing ODBC
+     * connection which will be an issue.
+     * This variable keeps track of non-LB'ed url and updated upon failover.
+     */
+    private final AtomicReference<String> activeURL = new AtomicReference<>();
+
+    public SSEHaDispatch(FilterConfig filterConfig) {
+        super(filterConfig);
+
+        GatewayConfig gatewayConfig = (GatewayConfig) 
filterConfig.getServletContext().getAttribute(GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE);
+        sslEnabled = gatewayConfig.isSSLEnabled();
+    }
+
+    @Override
+    public void init() {
+        super.init();
+        LOG.initializingForResourceRole(getServiceRole());
+        if (haProvider != null) {
+            HaServiceConfig serviceConfig = 
haProvider.getHaDescriptor().getServiceConfig(getServiceRole());
+            this.initializeLBHaDispatch(serviceConfig);
+        }
+    }
+
+    @Override
+    public HaProvider getHaProvider() {
+        return haProvider;
+    }
+
+    @Override
+    public void setLoadBalancingEnabled(boolean enabled) {
+        this.loadBalancingEnabled = enabled;
+    }
+
+    @Configure
+    public void setHaProvider(HaProvider haProvider) {
+        this.haProvider = haProvider;
+    }
+
+    @Override
+    public boolean isStickySessionEnabled() {
+        return stickySessionsEnabled;
+    }
+
+    @Override
+    public void setStickySessionsEnabled(boolean enabled) {
+        this.stickySessionsEnabled = enabled;
+    }
+
+    @Override
+    public String getStickySessionCookieName() {
+        return stickySessionCookieName;
+    }
+
+    @Override
+    public void setStickySessionCookieName(String stickySessionCookieName) {
+        this.stickySessionCookieName = stickySessionCookieName;
+    }
+
+    @Override
+    public boolean isLoadBalancingEnabled() {
+        return loadBalancingEnabled;
+    }
+
+    @Override
+    public List<String> getDisableLoadBalancingForUserAgents() {
+        return disableLoadBalancingForUserAgents;
+    }
+
+    @Override
+    public void setDisableLoadBalancingForUserAgents(List<String> 
disableLoadBalancingForUserAgents) {
+        this.disableLoadBalancingForUserAgents = 
disableLoadBalancingForUserAgents;
+    }
+
+    @Override
+    public AtomicReference<String> getActiveURL() {
+        return activeURL;
+    }
+
+    @Override
+    public void setActiveURL(String url) {
+        activeURL.set(url);
+    }
+
+
+    @Override
+    protected void executeRequestWrapper(HttpUriRequest outboundRequest, 
HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) {
+        boolean userAgentDisabled = isUserAgentDisabled(inboundRequest);
+        Optional<URI> backendURI = setBackendUri(outboundRequest, 
inboundRequest, userAgentDisabled);
+        executeRequest(outboundRequest, inboundRequest, outboundResponse);
+        shiftActiveURL(userAgentDisabled, backendURI);
+    }
+
+    @Override
+    protected void outboundResponseWrapper(final HttpUriRequest 
outboundRequest, final HttpServletRequest inboundRequest, final 
HttpServletResponse outboundResponse) {
+        setKnoxHaCookie(outboundRequest, inboundRequest, outboundResponse, 
sslEnabled);
+    }
+}
diff --git 
a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java 
b/gateway-provider-ha/src/test/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatchTest.java
similarity index 55%
copy from 
gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
copy to 
gateway-provider-ha/src/test/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatchTest.java
index 66abcb791..77c34f559 100644
--- a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
+++ 
b/gateway-provider-ha/src/test/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatchTest.java
@@ -6,34 +6,40 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.knox.gateway.sse;
+package org.apache.knox.gateway.ha.dispatch;
 
 import org.apache.http.HttpStatus;
 import org.apache.http.client.config.CookieSpecs;
-import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
 import org.apache.knox.gateway.config.GatewayConfig;
+import org.apache.knox.gateway.ha.provider.HaDescriptor;
+import org.apache.knox.gateway.ha.provider.HaProvider;
+import org.apache.knox.gateway.ha.provider.impl.DefaultHaProvider;
+import org.apache.knox.gateway.ha.provider.impl.HaDescriptorFactory;
 import org.apache.knox.gateway.services.GatewayServices;
 import org.apache.knox.gateway.services.ServiceType;
 import org.apache.knox.gateway.services.security.KeystoreService;
 import org.apache.knox.test.mock.MockServer;
 import org.apache.knox.test.mock.MockServletContext;
 import org.apache.knox.test.mock.MockServletInputStream;
+import org.easymock.Capture;
 import org.easymock.EasyMock;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import javax.servlet.AsyncContext;
 import javax.servlet.FilterConfig;
 import javax.servlet.ServletContext;
+import javax.servlet.http.Cookie;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import java.io.ByteArrayInputStream;
@@ -41,20 +47,21 @@ import java.io.InputStream;
 import java.io.PrintWriter;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.createMock;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.replay;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-public class SSEDispatchTest {
+public class SSEHaDispatchTest {
 
     private static MockServer MOCK_SSE_SERVER;
     private static URI URL;
@@ -66,30 +73,76 @@ public class SSEDispatchTest {
     }
 
     @Test
-    public void testCreateAndDestroyClient() throws Exception {
-        SSEDispatch sseDispatch = this.createDispatch();
-        assertNotNull(sseDispatch.getAsyncClient());
+    public void testHADispatchURL() throws Exception {
+        String serviceName = "SSE";
+        HaDescriptor descriptor = HaDescriptorFactory.createDescriptor();
+        
descriptor.addServiceConfig(HaDescriptorFactory.createServiceConfig(serviceName,
 "true", null, null, null, null, "true", "true", null, null, null, null));
+        HaProvider provider = new DefaultHaProvider(descriptor);
+        URI uri1 = new URI("http://host1.valid";);
+        URI uri2 = new URI("http://host2.valid";);
+        URI uri3 = new URI("http://host3.valid";);
+        ArrayList<String> urlList = new ArrayList<>();
+        urlList.add(uri1.toString());
+        urlList.add(uri2.toString());
+        urlList.add(uri3.toString());
+        provider.addHaService(serviceName, urlList);
 
-        sseDispatch.destroy();
-        assertFalse(((CloseableHttpAsyncClient) 
sseDispatch.getAsyncClient()).isRunning());
+        KeystoreService keystoreService = createMock(KeystoreService.class);
+        
expect(keystoreService.getTruststoreForHttpClient()).andReturn(null).once();
+
+        GatewayConfig gatewayConfig = createMock(GatewayConfig.class);
+        expect(gatewayConfig.isMetricsEnabled()).andReturn(false).once();
+        expect(gatewayConfig.isSSLEnabled()).andReturn(false).once();
+        
expect(gatewayConfig.getHttpClientMaxConnections()).andReturn(32).once();
+        
expect(gatewayConfig.getHttpClientConnectionTimeout()).andReturn(20000).once();
+        
expect(gatewayConfig.getHttpClientSocketTimeout()).andReturn(20000).once();
+        
expect(gatewayConfig.getHttpClientCookieSpec()).andReturn(CookieSpecs.STANDARD).anyTimes();
+
+        GatewayServices gatewayServices = createMock(GatewayServices.class);
+        
expect(gatewayServices.getService(ServiceType.KEYSTORE_SERVICE)).andReturn(keystoreService).once();
+
+        ServletContext servletContext = createMock(ServletContext.class);
+        
expect(servletContext.getAttribute(GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE)).andReturn(gatewayConfig).atLeastOnce();
+        
expect(servletContext.getAttribute(GatewayServices.GATEWAY_SERVICES_ATTRIBUTE)).andReturn(gatewayServices).atLeastOnce();
+
+        FilterConfig filterConfig = createMock(FilterConfig.class);
+        
expect(filterConfig.getServletContext()).andReturn(servletContext).atLeastOnce();
+        
expect(filterConfig.getInitParameter("useTwoWaySsl")).andReturn("false").once();
+        
expect(filterConfig.getInitParameter("httpclient.maxConnections")).andReturn(null).once();
+        
expect(filterConfig.getInitParameter("httpclient.connectionTimeout")).andReturn(null).once();
+        
expect(filterConfig.getInitParameter("httpclient.socketTimeout")).andReturn(null).once();
+        
expect(filterConfig.getInitParameter("serviceRole")).andReturn(null).once();
+        
expect(filterConfig.getInitParameter("httpclient.cookieSpec")).andReturn(null).anyTimes();
+
+        HttpServletRequest inboundRequest = 
EasyMock.createNiceMock(HttpServletRequest.class);
+        EasyMock.expect(inboundRequest.getRequestURL()).andReturn(new 
StringBuffer(provider.getActiveURL(serviceName))).anyTimes();
+        EasyMock.replay(inboundRequest, gatewayConfig, filterConfig, 
servletContext, keystoreService, gatewayServices);
+
+        SSEHaDispatch dispatch = new SSEHaDispatch(filterConfig);
+        dispatch.setHaProvider(provider);
+        dispatch.setServiceRole(serviceName);
+        dispatch.init();
+
+        /* make sure the dispatch URL is always active URL */
+        Assert.assertEquals(provider.getActiveURL(serviceName), 
dispatch.getDispatchUrl(inboundRequest).toString());
     }
 
     @Test
-    public void testGet2xx() throws Exception {
+    public void testStickySessionCookie() throws Exception {
         CountDownLatch latch = new CountDownLatch(1);
-        SSEDispatch sseDispatch = this.createDispatch();
+        SSEHaDispatch sseHaDispatch = this.createDispatch(null);
         PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
         HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_OK);
         AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
         HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+        Capture<Cookie> capturedArgument = Capture.newInstance();
 
-        this.expectResponseBodyAndHeader(printWriter, outboundResponse);
+        this.expectResponseBodyAndHeader(printWriter, outboundResponse, 
capturedArgument);
         replay(inboundRequest, asyncContext, outboundResponse, printWriter);
 
         MOCK_SSE_SERVER.expect()
                 .method("GET")
                 .pathInfo("/sse")
-                .header("request", "header")
                 .header("Accept", "text/event-stream")
                 .respond()
                 .status(HttpStatus.SC_OK)
@@ -97,322 +150,214 @@ public class SSEDispatchTest {
                 .header("response", "header")
                 .contentType("text/event-stream");
 
-        sseDispatch.doGet(URL, inboundRequest, outboundResponse);
+        sseHaDispatch.doGet(URL, inboundRequest, outboundResponse);
 
         latch.await(1L, TimeUnit.SECONDS);
         EasyMock.verify(asyncContext, outboundResponse, inboundRequest, 
printWriter);
         assertTrue(MOCK_SSE_SERVER.isEmpty());
+        assertEquals("KNOX_BACKEND-SSE", 
capturedArgument.getValue().getName());
     }
 
     @Test
-    public void testGet4xx() throws Exception {
-        CountDownLatch latch = new CountDownLatch(1);
-        SSEDispatch sseDispatch = this.createDispatch();
-        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_BAD_REQUEST);
-        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
-        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
-
-        replay(inboundRequest, asyncContext, outboundResponse);
-
-        MOCK_SSE_SERVER.expect()
-                .method("GET")
-                .pathInfo("/sse")
-                .respond()
-                .status(HttpStatus.SC_BAD_REQUEST);
-
-        sseDispatch.doGet(URL, inboundRequest, outboundResponse);
-
-        latch.await(1L, TimeUnit.SECONDS);
-        EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
-        assertTrue(MOCK_SSE_SERVER.isEmpty());
-    }
-
-    @Test
-    public void testGet5xx() throws Exception {
+    public void testNamedStickySessionCookie() throws Exception {
         CountDownLatch latch = new CountDownLatch(1);
-        SSEDispatch sseDispatch = this.createDispatch();
-        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+        SSEHaDispatch sseHaDispatch = this.createDispatch("COOKIE_NAME");
+        PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_OK);
         AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
         HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+        Capture<Cookie> capturedArgument = Capture.newInstance();
 
-        replay(inboundRequest, asyncContext, outboundResponse);
+        this.expectResponseBodyAndHeader(printWriter, outboundResponse, 
capturedArgument);
+        replay(inboundRequest, asyncContext, outboundResponse, printWriter);
 
         MOCK_SSE_SERVER.expect()
                 .method("GET")
                 .pathInfo("/sse")
+                .header("Accept", "text/event-stream")
                 .respond()
-                .status(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+                .status(HttpStatus.SC_OK)
+                
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
 StandardCharsets.UTF_8)
+                .header("response", "header")
+                .contentType("text/event-stream");
 
-        sseDispatch.doGet(URL, inboundRequest, outboundResponse);
+        sseHaDispatch.doGet(URL, inboundRequest, outboundResponse);
 
         latch.await(1L, TimeUnit.SECONDS);
-        EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+        EasyMock.verify(asyncContext, outboundResponse, inboundRequest, 
printWriter);
         assertTrue(MOCK_SSE_SERVER.isEmpty());
+        assertEquals("COOKIE_NAME-SSE", capturedArgument.getValue().getName());
     }
 
     @Test
-    public void testPost2xx() throws Exception {
+    public void testLoadBalancingWithoutStickySessionCookie() throws Exception 
{
         CountDownLatch latch = new CountDownLatch(1);
-        SSEDispatch sseDispatch = this.createDispatch();
+        SSEHaDispatch sseHaDispatch = this.createDispatch(null);
         PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
         HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_OK);
         AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
         HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
-
-        this.expectResponseBodyAndHeader(printWriter, outboundResponse);
+        Capture<Cookie> capturedArgument = Capture.newInstance();
+        
expect(inboundRequest.getHeader("User-Agent")).andReturn("unknown").anyTimes();
+        this.expectResponseBodyAndHeader(printWriter, outboundResponse, 
capturedArgument);
         replay(inboundRequest, asyncContext, outboundResponse, printWriter);
 
         MOCK_SSE_SERVER.expect()
-                .method("POST")
+                .method("GET")
                 .pathInfo("/sse")
-                .header("request", "header")
                 .header("Accept", "text/event-stream")
-                .content("{\"request\":\"body\"}", StandardCharsets.UTF_8)
                 .respond()
                 .status(HttpStatus.SC_OK)
                 
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
 StandardCharsets.UTF_8)
                 .header("response", "header")
                 .contentType("text/event-stream");
 
-        sseDispatch.doPost(URL, inboundRequest, outboundResponse);
+        sseHaDispatch.doGet(URL, inboundRequest, outboundResponse);
 
         latch.await(1L, TimeUnit.SECONDS);
         EasyMock.verify(asyncContext, outboundResponse, inboundRequest, 
printWriter);
         assertTrue(MOCK_SSE_SERVER.isEmpty());
+        assertEquals("http://host2:3333/sse";, 
sseHaDispatch.getHaProvider().getActiveURL("SSE"));
     }
 
     @Test
-    public void testPost4xx() throws Exception {
+    public void testLoadBalancingDisabledWithUserAgent() throws Exception {
         CountDownLatch latch = new CountDownLatch(1);
-        SSEDispatch sseDispatch = this.createDispatch();
-        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_NOT_FOUND);
-        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
-        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
-
-        replay(inboundRequest, asyncContext, outboundResponse);
-
-        MOCK_SSE_SERVER.expect()
-                .method("POST")
-                .pathInfo("/sse")
-                .respond()
-                .status(HttpStatus.SC_NOT_FOUND);
-
-        sseDispatch.doPost(URL, inboundRequest, outboundResponse);
-
-        latch.await(1L, TimeUnit.SECONDS);
-        EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
-        assertTrue(MOCK_SSE_SERVER.isEmpty());
-    }
-
-    @Test
-    public void testPost5xx() throws Exception {
-        CountDownLatch latch = new CountDownLatch(1);
-        SSEDispatch sseDispatch = this.createDispatch();
-        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
-        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
-        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
-
-        replay(inboundRequest, asyncContext, outboundResponse);
-
-        MOCK_SSE_SERVER.expect()
-                .method("POST")
-                .pathInfo("/sse")
-                .respond()
-                .status(HttpStatus.SC_INTERNAL_SERVER_ERROR);
-
-        sseDispatch.doPost(URL, inboundRequest, outboundResponse);
-
-        latch.await(1L, TimeUnit.SECONDS);
-        EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
-        assertTrue(MOCK_SSE_SERVER.isEmpty());
-    }
-
-    @Test
-    public void testPut2xx() throws Exception {
-        CountDownLatch latch = new CountDownLatch(1);
-        SSEDispatch sseDispatch = this.createDispatch();
+        SSEHaDispatch sseHaDispatch = this.createDispatch(null);
         PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
         HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_OK);
         AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
         HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+        
expect(inboundRequest.getHeader("User-Agent")).andReturn("user1").anyTimes();
+        Capture<Cookie> capturedArgument = Capture.newInstance();
 
-        this.expectResponseBodyAndHeader(printWriter, outboundResponse);
+        this.expectResponseBodyAndHeader(printWriter, outboundResponse, 
capturedArgument);
         replay(inboundRequest, asyncContext, outboundResponse, printWriter);
 
         MOCK_SSE_SERVER.expect()
-                .method("PUT")
+                .method("GET")
                 .pathInfo("/sse")
-                .header("request", "header")
                 .header("Accept", "text/event-stream")
-                .content("{\"request\":\"body\"}", StandardCharsets.UTF_8)
                 .respond()
                 .status(HttpStatus.SC_OK)
                 
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
 StandardCharsets.UTF_8)
                 .header("response", "header")
                 .contentType("text/event-stream");
 
-        sseDispatch.doPut(URL, inboundRequest, outboundResponse);
+        sseHaDispatch.doGet(URL, inboundRequest, outboundResponse);
 
         latch.await(1L, TimeUnit.SECONDS);
         EasyMock.verify(asyncContext, outboundResponse, inboundRequest, 
printWriter);
         assertTrue(MOCK_SSE_SERVER.isEmpty());
+        assertEquals("http://localhost:"; + MOCK_SSE_SERVER.getPort() + "/sse", 
sseHaDispatch.getHaProvider().getActiveURL("SSE"));
     }
 
     @Test
-    public void testPut4xx() throws Exception {
+    public void testLoadBalancingDisabledWithStickySession() throws Exception {
         CountDownLatch latch = new CountDownLatch(1);
-        SSEDispatch sseDispatch = this.createDispatch();
-        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_NOT_FOUND);
-        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
-        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
-
-        replay(inboundRequest, asyncContext, outboundResponse);
-
-        MOCK_SSE_SERVER.expect()
-                .method("PUT")
-                .pathInfo("/sse")
-                .respond()
-                .status(HttpStatus.SC_NOT_FOUND);
-
-        sseDispatch.doPut(URL, inboundRequest, outboundResponse);
-
-        latch.await(1L, TimeUnit.SECONDS);
-        EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
-        assertTrue(MOCK_SSE_SERVER.isEmpty());
-    }
-
-    @Test
-    public void testPut5xx() throws Exception {
-        CountDownLatch latch = new CountDownLatch(1);
-        SSEDispatch sseDispatch = this.createDispatch();
-        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
-        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
-        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
-
-        replay(inboundRequest, asyncContext, outboundResponse);
-
-        MOCK_SSE_SERVER.expect()
-                .method("PUT")
-                .pathInfo("/sse")
-                .respond()
-                .status(HttpStatus.SC_INTERNAL_SERVER_ERROR);
-
-        sseDispatch.doPut(URL, inboundRequest, outboundResponse);
-
-        latch.await(1L, TimeUnit.SECONDS);
-        EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
-        assertTrue(MOCK_SSE_SERVER.isEmpty());
-    }
-
-    @Test
-    public void testPatch2xx() throws Exception {
-        CountDownLatch latch = new CountDownLatch(1);
-        SSEDispatch sseDispatch = this.createDispatch();
+        SSEHaDispatch sseHaDispatch = this.createDispatch(null);
         PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
         HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_OK);
         AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
         HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+        Capture<Cookie> capturedArgument = Capture.newInstance();
 
-        this.expectResponseBodyAndHeader(printWriter, outboundResponse);
+        this.expectResponseBodyAndHeader(printWriter, outboundResponse, 
capturedArgument);
         replay(inboundRequest, asyncContext, outboundResponse, printWriter);
 
         MOCK_SSE_SERVER.expect()
-                .method("PATCH")
+                .method("GET")
                 .pathInfo("/sse")
-                .header("request", "header")
                 .header("Accept", "text/event-stream")
-                .content("{\"request\":\"body\"}", StandardCharsets.UTF_8)
                 .respond()
                 .status(HttpStatus.SC_OK)
                 
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
 StandardCharsets.UTF_8)
                 .header("response", "header")
                 .contentType("text/event-stream");
 
-        sseDispatch.doPatch(URL, inboundRequest, outboundResponse);
-
+        sseHaDispatch.doGet(URL, inboundRequest, outboundResponse);
         latch.await(1L, TimeUnit.SECONDS);
-        EasyMock.verify(asyncContext, outboundResponse, inboundRequest, 
printWriter);
-        assertTrue(MOCK_SSE_SERVER.isEmpty());
-    }
-
-    @Test
-    public void testPatch4xx() throws Exception {
-        CountDownLatch latch = new CountDownLatch(1);
-        SSEDispatch sseDispatch = this.createDispatch();
-        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_NOT_FOUND);
-        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
-        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
-
-        replay(inboundRequest, asyncContext, outboundResponse);
+        assertEquals("http://host2:3333/sse";, 
sseHaDispatch.getHaProvider().getActiveURL("SSE"));
+
+
+        //Second request, sticky session cookie included
+        CountDownLatch latch2 = new CountDownLatch(1);
+        Cookie[] cookies = new Cookie[1];
+        cookies[0] = new Cookie(capturedArgument.getValue().getName(), 
capturedArgument.getValue().getValue());
+        PrintWriter printWriter2 = EasyMock.createNiceMock(PrintWriter.class);
+        HttpServletResponse outboundResponse2 = 
this.getServletResponse(HttpStatus.SC_OK);
+        AsyncContext asyncContext2 = this.getAsyncContext(latch2, 
outboundResponse2);
+        Capture<Cookie> capturedArgument2 = Capture.newInstance();
+        this.expectResponseBodyAndHeader(printWriter2, outboundResponse2, 
capturedArgument2);
+        HttpServletRequest inboundRequest2 = 
this.getHttpServletRequest(asyncContext2);
+        expect(inboundRequest2.getCookies()).andReturn(cookies).anyTimes();
+        replay(inboundRequest2, asyncContext2, outboundResponse2, 
printWriter2);
 
         MOCK_SSE_SERVER.expect()
-                .method("PATCH")
+                .method("GET")
                 .pathInfo("/sse")
+                .header("Accept", "text/event-stream")
                 .respond()
-                .status(HttpStatus.SC_NOT_FOUND);
+                .status(HttpStatus.SC_OK)
+                
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
 StandardCharsets.UTF_8)
+                .header("response", "header")
+                .contentType("text/event-stream");
 
-        sseDispatch.doPatch(URL, inboundRequest, outboundResponse);
+        sseHaDispatch.doGet(URL, inboundRequest2, outboundResponse2);
+        latch2.await(1L, TimeUnit.SECONDS);
 
-        latch.await(1L, TimeUnit.SECONDS);
-        EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+        EasyMock.verify(asyncContext, outboundResponse, inboundRequest, 
printWriter, inboundRequest2);
         assertTrue(MOCK_SSE_SERVER.isEmpty());
+        assertEquals("http://host2:3333/sse";, 
sseHaDispatch.getHaProvider().getActiveURL("SSE"));
     }
 
-    @Test
-    public void testPatch5xx() throws Exception {
-        CountDownLatch latch = new CountDownLatch(1);
-        SSEDispatch sseDispatch = this.createDispatch();
-        HttpServletResponse outboundResponse = 
this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
-        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
-        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+    private SSEHaDispatch createDispatch(String cookieName) throws Exception {
+        KeystoreService keystoreService = createMock(KeystoreService.class);
+        
expect(keystoreService.getTruststoreForHttpClient()).andReturn(null).once();
 
-        replay(inboundRequest, asyncContext, outboundResponse);
+        GatewayConfig gatewayConfig = createMock(GatewayConfig.class);
+        expect(gatewayConfig.isMetricsEnabled()).andReturn(false).once();
+        expect(gatewayConfig.isSSLEnabled()).andReturn(true).once();
+        
expect(gatewayConfig.getHttpClientMaxConnections()).andReturn(32).once();
+        
expect(gatewayConfig.getHttpClientConnectionTimeout()).andReturn(20000).once();
+        
expect(gatewayConfig.getHttpClientSocketTimeout()).andReturn(20000).once();
+        
expect(gatewayConfig.getHttpClientCookieSpec()).andReturn(CookieSpecs.STANDARD).anyTimes();
 
-        MOCK_SSE_SERVER.expect()
-                .method("PATCH")
-                .pathInfo("/sse")
-                .respond()
-                .status(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+        GatewayServices gatewayServices = createMock(GatewayServices.class);
+        
expect(gatewayServices.getService(ServiceType.KEYSTORE_SERVICE)).andReturn(keystoreService).once();
 
-        sseDispatch.doPatch(URL, inboundRequest, outboundResponse);
+        ServletContext servletContext = createMock(ServletContext.class);
+        
expect(servletContext.getAttribute(GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE)).andReturn(gatewayConfig).atLeastOnce();
+        
expect(servletContext.getAttribute(GatewayServices.GATEWAY_SERVICES_ATTRIBUTE)).andReturn(gatewayServices).atLeastOnce();
 
-        latch.await(1L, TimeUnit.SECONDS);
-        EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
-        assertTrue(MOCK_SSE_SERVER.isEmpty());
-    }
+        FilterConfig filterConfig = createMock(FilterConfig.class);
+        
expect(filterConfig.getServletContext()).andReturn(servletContext).atLeastOnce();
+        
expect(filterConfig.getInitParameter("useTwoWaySsl")).andReturn("false").once();
+        
expect(filterConfig.getInitParameter("httpclient.maxConnections")).andReturn(null).once();
+        
expect(filterConfig.getInitParameter("httpclient.connectionTimeout")).andReturn(null).once();
+        
expect(filterConfig.getInitParameter("httpclient.socketTimeout")).andReturn(null).once();
+        
expect(filterConfig.getInitParameter("serviceRole")).andReturn(null).once();
+        
expect(filterConfig.getInitParameter("httpclient.cookieSpec")).andReturn(null).anyTimes();
 
-    @Test
-    public void testServerNotAvailable() throws Exception {
-        CountDownLatch latch = new CountDownLatch(1);
-        SSEDispatch sseDispatch = this.createDispatch();
-        HttpServletResponse outboundResponse = 
EasyMock.createNiceMock(HttpServletResponse.class);
-        AsyncContext asyncContext = this.getAsyncContext(latch, 
outboundResponse);
-        HttpServletRequest inboundRequest = 
this.getHttpServletRequest(asyncContext);
+        HaProvider provider = this.createProvider(cookieName);
 
-        replay(inboundRequest, asyncContext, outboundResponse);
+        replay(keystoreService, gatewayConfig, gatewayServices, 
servletContext, filterConfig);
 
-        sseDispatch.doGet(new URI("http://localhost:11223/sse";), 
inboundRequest, outboundResponse);
+        SSEHaDispatch dispatch = new SSEHaDispatch(filterConfig);
+        dispatch.setHaProvider(provider);
+        dispatch.setServiceRole("SSE");
+        dispatch.init();
 
-        latch.await(1L, TimeUnit.SECONDS);
-        EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+        return dispatch;
     }
 
-    private HttpServletRequest getHttpServletRequest(AsyncContext 
asyncContext) throws Exception {
-        Map<String, String> headers = new HashMap<>();
-        headers.put("request", "header");
-        InputStream stream = new 
ByteArrayInputStream("{\"request\":\"body\"}".getBytes(StandardCharsets.UTF_8));
-        MockServletInputStream mockServletInputStream = new 
MockServletInputStream(stream);
-        HttpServletRequest inboundRequest = 
EasyMock.createNiceMock(HttpServletRequest.class);
+    private HttpServletResponse getServletResponse(int statusCode) {
+        HttpServletResponse outboundResponse = 
EasyMock.createNiceMock(HttpServletResponse.class);
 
-        
EasyMock.expect(inboundRequest.getHeaderNames()).andReturn(Collections.enumeration(headers.keySet())).anyTimes();
-        
EasyMock.expect(inboundRequest.startAsync()).andReturn(asyncContext).once();
-        
EasyMock.expect(inboundRequest.getHeader("request")).andReturn("header").once();
-        
EasyMock.expect(inboundRequest.getContentType()).andReturn("application/json").anyTimes();
-        
EasyMock.expect(inboundRequest.getInputStream()).andReturn(mockServletInputStream).anyTimes();
-        
EasyMock.expect(inboundRequest.getContentLength()).andReturn(mockServletInputStream.available()).anyTimes();
-        EasyMock.expect(inboundRequest.getServletContext()).andReturn(new 
MockServletContext()).anyTimes();
+        outboundResponse.setStatus(statusCode);
+        EasyMock.expectLastCall();
 
-        return inboundRequest;
+        return outboundResponse;
     }
 
     private AsyncContext getAsyncContext(CountDownLatch latch, 
HttpServletResponse outboundResponse) {
@@ -428,17 +373,26 @@ public class SSEDispatchTest {
         return asyncContext;
     }
 
-    private HttpServletResponse getServletResponse(int statusCode) {
-        HttpServletResponse outboundResponse = 
EasyMock.createNiceMock(HttpServletResponse.class);
+    private HttpServletRequest getHttpServletRequest(AsyncContext 
asyncContext) throws Exception {
+        Map<String, String> headers = new HashMap<>();
+        headers.put("request", "header");
+        InputStream stream = new 
ByteArrayInputStream("{\"request\":\"body\"}".getBytes(StandardCharsets.UTF_8));
+        MockServletInputStream mockServletInputStream = new 
MockServletInputStream(stream);
+        HttpServletRequest inboundRequest = 
EasyMock.createNiceMock(HttpServletRequest.class);
 
-        outboundResponse.setStatus(statusCode);
-        EasyMock.expectLastCall();
+        
EasyMock.expect(inboundRequest.getHeaderNames()).andReturn(Collections.enumeration(headers.keySet())).anyTimes();
+        
EasyMock.expect(inboundRequest.startAsync()).andReturn(asyncContext).anyTimes();
+        
EasyMock.expect(inboundRequest.getHeader("request")).andReturn("header").once();
+        
EasyMock.expect(inboundRequest.getContentType()).andReturn("application/json").anyTimes();
+        
EasyMock.expect(inboundRequest.getInputStream()).andReturn(mockServletInputStream).anyTimes();
+        
EasyMock.expect(inboundRequest.getContentLength()).andReturn(mockServletInputStream.available()).anyTimes();
+        EasyMock.expect(inboundRequest.getServletContext()).andReturn(new 
MockServletContext()).anyTimes();
 
-        return outboundResponse;
+        return inboundRequest;
     }
 
-    private void expectResponseBodyAndHeader(PrintWriter printWriter, 
HttpServletResponse outboundResponse) throws Exception {
-        outboundResponse.addHeader("response", "header");
+    private void expectResponseBodyAndHeader(PrintWriter printWriter, 
HttpServletResponse outboundResponse, Capture<Cookie> capturedArgument) throws 
Exception {
+        outboundResponse.addCookie(capture(capturedArgument));
         EasyMock.expectLastCall();
         
EasyMock.expect(outboundResponse.getWriter()).andReturn(printWriter).anyTimes();
         printWriter.write("id:1\nevent:event1\ndata:data1");
@@ -449,35 +403,19 @@ public class SSEDispatchTest {
         EasyMock.expectLastCall().times(2);
     }
 
-    private SSEDispatch createDispatch() throws Exception {
-        KeystoreService keystoreService = createMock(KeystoreService.class);
-        
expect(keystoreService.getTruststoreForHttpClient()).andReturn(null).once();
-
-        GatewayConfig gatewayConfig = createMock(GatewayConfig.class);
-        expect(gatewayConfig.isMetricsEnabled()).andReturn(false).once();
-        
expect(gatewayConfig.getHttpClientMaxConnections()).andReturn(32).once();
-        
expect(gatewayConfig.getHttpClientConnectionTimeout()).andReturn(20000).once();
-        
expect(gatewayConfig.getHttpClientSocketTimeout()).andReturn(20000).once();
-        
expect(gatewayConfig.getHttpClientCookieSpec()).andReturn(CookieSpecs.STANDARD).anyTimes();
-
-        GatewayServices gatewayServices = createMock(GatewayServices.class);
-        
expect(gatewayServices.getService(ServiceType.KEYSTORE_SERVICE)).andReturn(keystoreService).once();
-
-        ServletContext servletContext = createMock(ServletContext.class);
-        
expect(servletContext.getAttribute(GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE)).andReturn(gatewayConfig).atLeastOnce();
-        
expect(servletContext.getAttribute(GatewayServices.GATEWAY_SERVICES_ATTRIBUTE)).andReturn(gatewayServices).atLeastOnce();
-
-        FilterConfig filterConfig = createMock(FilterConfig.class);
-        
expect(filterConfig.getServletContext()).andReturn(servletContext).atLeastOnce();
-        
expect(filterConfig.getInitParameter("useTwoWaySsl")).andReturn("false").once();
-        
expect(filterConfig.getInitParameter("httpclient.maxConnections")).andReturn(null).once();
-        
expect(filterConfig.getInitParameter("httpclient.connectionTimeout")).andReturn(null).once();
-        
expect(filterConfig.getInitParameter("httpclient.socketTimeout")).andReturn(null).once();
-        
expect(filterConfig.getInitParameter("serviceRole")).andReturn(null).once();
-        
expect(filterConfig.getInitParameter("httpclient.cookieSpec")).andReturn(null).anyTimes();
-
-        replay(keystoreService, gatewayConfig, gatewayServices, 
servletContext, filterConfig);
-
-        return new SSEDispatch(filterConfig);
+    private HaProvider createProvider(String cookieName) throws Exception {
+        String serviceName = "SSE";
+        HaDescriptor descriptor = HaDescriptorFactory.createDescriptor();
+        
descriptor.addServiceConfig(HaDescriptorFactory.createServiceConfig(serviceName,
 "true", null, null, null, null, "true", "true", cookieName, null, 
"agentX,user1,agentY", null));
+        HaProvider provider = new DefaultHaProvider(descriptor);
+        URI uri1 = new URI("http://localhost:"; + MOCK_SSE_SERVER.getPort() + 
"/sse");
+        URI uri2 = new URI("http://host2:3333/sse";);
+        URI uri3 = new URI("http://host3:3333/sse";);
+        ArrayList<String> urlList = new ArrayList<>();
+        urlList.add(uri1.toString());
+        urlList.add(uri2.toString());
+        urlList.add(uri3.toString());
+        provider.addHaService(serviceName, urlList);
+        return provider;
     }
 }
\ No newline at end of file
diff --git 
a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java 
b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java
index 318c4bb82..767cbccef 100644
--- a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java
@@ -98,16 +98,17 @@ public class SSEDispatch extends ConfigurableDispatch {
     private void doHttpMethod(HttpUriRequest httpMethod, HttpServletRequest 
inboundRequest, HttpServletResponse outboundResponse) throws IOException {
         this.addAcceptHeader(httpMethod);
         this.copyRequestHeaderFields(httpMethod, inboundRequest);
-        this.executeRequest(httpMethod, outboundResponse, inboundRequest);
+        this.executeRequestWrapper(httpMethod, inboundRequest, 
outboundResponse);
     }
 
-    private void executeRequest(HttpUriRequest outboundRequest, 
HttpServletResponse outboundResponse, HttpServletRequest inboundRequest) {
+    @Override
+    protected void executeRequest(HttpUriRequest outboundRequest, 
HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) {
         AsyncContext asyncContext = inboundRequest.startAsync();
         //No timeout
         asyncContext.setTimeout(0L);
 
         HttpAsyncRequestProducer producer = 
HttpAsyncMethods.create(outboundRequest);
-        AsyncCharConsumer<SSEResponse> consumer = new 
SSECharConsumer(outboundResponse, outboundRequest.getURI(), asyncContext);
+        AsyncCharConsumer<SSEResponse> consumer = new 
SSECharConsumer(outboundResponse, outboundRequest.getURI(), asyncContext, 
inboundRequest, outboundRequest);
         this.executeAsyncRequest(producer, consumer, outboundRequest);
     }
 
@@ -174,11 +175,15 @@ public class SSEDispatch extends ConfigurableDispatch {
     private class SSECharConsumer extends AsyncCharConsumer<SSEResponse> {
         private SSEResponse sseResponse;
         private final HttpServletResponse outboundResponse;
+        private final HttpUriRequest outboundRequest;
+        private final HttpServletRequest inboundRequest;
         private final URI url;
         private final AsyncContext asyncContext;
 
-        SSECharConsumer(HttpServletResponse outboundResponse, URI url, 
AsyncContext asyncContext) {
+        SSECharConsumer(HttpServletResponse outboundResponse, URI url, 
AsyncContext asyncContext, HttpServletRequest inboundRequest, HttpUriRequest 
outboundRequest) {
             this.outboundResponse = outboundResponse;
+            this.outboundRequest = outboundRequest;
+            this.inboundRequest = inboundRequest;
             this.url = url;
             this.asyncContext = asyncContext;
         }
@@ -187,6 +192,7 @@ public class SSEDispatch extends ConfigurableDispatch {
         protected void onResponseReceived(final HttpResponse inboundResponse) {
             this.sseResponse = new SSEResponse(inboundResponse);
             if (isSuccessful(inboundResponse.getStatusLine().getStatusCode())) 
{
+                outboundResponseWrapper(outboundRequest, inboundRequest, 
outboundResponse);
                 handleSuccessResponse(outboundResponse, url, inboundResponse);
             } else {
                 handleErrorResponse(outboundResponse, url, inboundResponse);
diff --git 
a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEEntity.java 
b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEEntity.java
index 8b598c631..8cfbda6a6 100644
--- a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEEntity.java
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEEntity.java
@@ -30,8 +30,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 
 public class SSEEntity extends AbstractHttpEntity {
 
-    private static final String SSE_DELIMITER = ":";
-
     private final BlockingQueue<SSEvent> eventQueue;
     private final StringBuilder eventBuilder = new StringBuilder();
     private final HttpEntity httpEntity;
@@ -68,45 +66,10 @@ public class SSEEntity extends AbstractHttpEntity {
 
     private void processEvent() {
         String unprocessedEvent = eventBuilder.toString();
-        SSEvent ssEvent = new SSEvent();
-
-        for (String line : unprocessedEvent.split("\\R")) {
-            String[] lineTokens =  this.parseLine(line);
-            switch (lineTokens[0]) {
-                case "id":
-                    ssEvent.setId(lineTokens[1].trim());
-                    break;
-                case "event":
-                    ssEvent.setEvent(lineTokens[1].trim());
-                    break;
-                case "data":
-                    ssEvent.setData(lineTokens[1].trim());
-                    break;
-                case "comment":
-                    ssEvent.setComment(lineTokens[1].trim());
-                    break;
-                case "retry":
-                    ssEvent.setRetry(Long.parseLong(lineTokens[1].trim()));
-                    break;
-                default:
-                    break;
-            }
-        }
+        SSEvent ssEvent = SSEvent.fromString(unprocessedEvent);
         eventQueue.add(ssEvent);
     }
 
-    private String[] parseLine(String line) {
-        String[] lineTokens = new String[2];
-        if(line.startsWith(SSE_DELIMITER)) {
-            lineTokens[0] = "comment";
-            lineTokens[1] = line;
-        } else {
-            lineTokens = line.split(SSE_DELIMITER, 2);
-        }
-
-        return lineTokens;
-    }
-
     public void sendEvent(AsyncContext asyncContext) throws IOException, 
InterruptedException {
         while (!eventQueue.isEmpty()) {
             SSEvent event = eventQueue.take();
diff --git a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEvent.java 
b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEvent.java
index dc643ed0e..98991a588 100644
--- a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEvent.java
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEvent.java
@@ -17,16 +17,17 @@
  */
 package org.apache.knox.gateway.sse;
 
+import java.util.Arrays;
+
 public class SSEvent {
 
-    private String data;
-    private String event;
-    private String id;
-    private String comment;
-    private Long retry;
+    private static final String SSE_DELIMITER = ":";
 
-    public SSEvent() {
-    }
+    private final String data;
+    private final String event;
+    private final String id;
+    private final String comment;
+    private final Long retry;
 
     public SSEvent(String data, String event, String id, String comment, Long 
retry) {
         this.data = data;
@@ -36,26 +37,6 @@ public class SSEvent {
         this.retry = retry;
     }
 
-    public void setData(String data) {
-        this.data = data;
-    }
-
-    public void setEvent(String event) {
-        this.event = event;
-    }
-
-    public void setId(String id) {
-        this.id = id;
-    }
-
-    public void setComment(String comment) {
-        this.comment = comment;
-    }
-
-    public void setRetry(Long retry) {
-        this.retry = retry;
-    }
-
     public String getData() {
         return this.data;
     }
@@ -76,14 +57,56 @@ public class SSEvent {
         return this.retry;
     }
 
+    public static SSEvent fromString(String unprocessedEvent) {
+        String id = null;
+        String event = null;
+        String data = null;
+        String comment = null;
+        Long retry = null;
+
+        for (String line : unprocessedEvent.split("\\R")) {
+            String[] lineTokens = parseLine(line);
+            switch (lineTokens[0]) {
+                case "id":
+                    id = lineTokens[1];
+                    break;
+                case "event":
+                    event = lineTokens[1];
+                    break;
+                case "data":
+                    data = lineTokens[1];
+                    break;
+                case "comment":
+                    comment = lineTokens[1];
+                    break;
+                case "retry":
+                    retry = Long.parseLong(lineTokens[1]);
+                    break;
+                default:
+                    break;
+            }
+        }
+        return new SSEvent(data, event, id, comment, retry);
+    }
+
+    private static String[] parseLine(String line) {
+        String[] lineTokens = Arrays.stream(line.split(SSE_DELIMITER, 
2)).map(String::trim).toArray(String[]::new);
+
+        if (lineTokens[0].isEmpty()) {
+            lineTokens[0] = "comment";
+        }
+
+        return lineTokens;
+    }
+
     @Override
     public String toString() {
         StringBuilder eventString = new StringBuilder();
 
-        this.appendField(eventString, this.id, "id:");
-        this.appendField(eventString, this.event, "event:");
-        this.appendField(eventString, this.data, "data:");
-        this.appendField(eventString, this.retry, "retry:");
+        this.appendField(eventString, this.id, "id");
+        this.appendField(eventString, this.event, "event");
+        this.appendField(eventString, this.data, "data");
+        this.appendField(eventString, this.retry, "retry");
         this.appendField(eventString, this.comment, "");
 
         return eventString.toString();
@@ -95,6 +118,7 @@ public class SSEvent {
                 eventString.append('\n');
             }
             eventString.append(prefix);
+            eventString.append(SSE_DELIMITER);
             eventString.append(field);
         }
     }
diff --git 
a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java 
b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
index 66abcb791..e8ed44ebf 100644
--- a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
+++ b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
@@ -480,4 +480,4 @@ public class SSEDispatchTest {
 
         return new SSEDispatch(filterConfig);
     }
-}
\ No newline at end of file
+}
diff --git 
a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEEntityTest.java 
b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEEntityTest.java
index 1c0375d9a..2e36b8bdb 100644
--- a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEEntityTest.java
+++ b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEEntityTest.java
@@ -50,7 +50,7 @@ public class SSEEntityTest {
         assertEquals("event", actualSSEvent.getEvent());
         assertEquals("data", actualSSEvent.getData());
         assertEquals(1L, actualSSEvent.getRetry().longValue());
-        assertEquals(":testing", actualSSEvent.getComment());
+        assertEquals("testing", actualSSEvent.getComment());
     }
 
     @Test
@@ -83,7 +83,7 @@ public class SSEEntityTest {
         assertEquals("event3", actualSSEvent.getEvent());
         assertEquals("data3", actualSSEvent.getData());
         assertEquals(1045L, actualSSEvent.getRetry().longValue());
-        assertEquals(":TEST", actualSSEvent.getComment());
+        assertEquals("TEST", actualSSEvent.getComment());
     }
 
     @Test
@@ -114,7 +114,7 @@ public class SSEEntityTest {
         assertNull(actualSSEvent.getEvent());
         
assertEquals("data:{\"records\":[{\"col_str\":\"0e01eeef73f6833a98e1df6a5a00ea46f5b52dbee27ee89ebce894aaa555c90130b08fae8aaf600ef845b774ab0082fcaf8c\",\"col_int\":-580163093,\"col_ts\":\"2024-08-14T07:41:15.125\"}],\"job_status\":\"RUNNING\",\"end_of_samples\":false}",
 actualSSEvent.getData());
         assertEquals(33L, actualSSEvent.getRetry().longValue());
-        assertEquals("::::test", actualSSEvent.getComment());
+        assertEquals(":::test", actualSSEvent.getComment());
     }
 
     @Test
@@ -160,4 +160,23 @@ public class SSEEntityTest {
         assertEquals("event4", actualSSEvent.getEvent());
         assertEquals("data4", actualSSEvent.getData());
     }
+
+    @Test
+    public void testParseSingleEventTrim() {
+        SSEEntity sseEntity = new SSEEntity(entityMock);
+        BlockingQueue<SSEvent> eventQueue = sseEntity.getEventQueue();
+        String unprocessedEvent = " id : 1 \n event : event \n data : data \n 
retry : 1 \n : testing \n\n";
+        CharBuffer cb = CharBuffer.wrap(unprocessedEvent);
+
+        sseEntity.readCharBuffer(cb);
+
+        assertFalse(eventQueue.isEmpty());
+
+        SSEvent actualSSEvent = eventQueue.peek();
+        assertEquals("1", actualSSEvent.getId());
+        assertEquals("event", actualSSEvent.getEvent());
+        assertEquals("data", actualSSEvent.getData());
+        assertEquals(1L, actualSSEvent.getRetry().longValue());
+        assertEquals("testing", actualSSEvent.getComment());
+    }
 }
diff --git 
a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEventTest.java 
b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEventTest.java
index 378726ab3..6921cb559 100644
--- a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEventTest.java
+++ b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEventTest.java
@@ -26,7 +26,7 @@ public class SSEventTest {
 
     @Test
     public void testToStringWithAll() {
-        SSEvent ssEvent = new SSEvent("data", "event", "id", ":comment", 5L);
+        SSEvent ssEvent = new SSEvent("data", "event", "id", "comment", 5L);
         String expected = "id:id\nevent:event\ndata:data\nretry:5\n:comment";
 
         assertEquals(expected, ssEvent.toString());
@@ -34,8 +34,8 @@ public class SSEventTest {
 
     @Test
     public void testToStringNoId() {
-        SSEvent ssEventNull = new SSEvent("data", "event", null, ":test", 2L);
-        SSEvent ssEventEmpty = new SSEvent("data", "event", "", ":new 
comment", 1L);
+        SSEvent ssEventNull = new SSEvent("data", "event", null, "test", 2L);
+        SSEvent ssEventEmpty = new SSEvent("data", "event", "", "new comment", 
1L);
         String expectedForNull = "event:event\ndata:data\nretry:2\n:test";
         String expectedForEmpty = "id:\nevent:event\ndata:data\nretry:1\n:new 
comment";
 
@@ -45,8 +45,8 @@ public class SSEventTest {
 
     @Test
     public void testToStringNoEvent() {
-        SSEvent ssEventNull = new SSEvent("data", null, "id", ":comment", 11L);
-        SSEvent ssEventEmpty = new SSEvent("data", "", "id", ":test comment", 
30L);
+        SSEvent ssEventNull = new SSEvent("data", null, "id", "comment", 11L);
+        SSEvent ssEventEmpty = new SSEvent("data", "", "id", "test comment", 
30L);
         String expectedForNull = "id:id\ndata:data\nretry:11\n:comment";
         String expectedForEmpty = "id:id\nevent:\ndata:data\nretry:30\n:test 
comment";
 
@@ -56,8 +56,8 @@ public class SSEventTest {
 
     @Test
     public void testToStringNoData() {
-        SSEvent ssEventNull = new SSEvent(null, "event", "id", ":comment", 2L);
-        SSEvent ssEventEmpty = new SSEvent("", "event", "id", ":testing", 1L);
+        SSEvent ssEventNull = new SSEvent(null, "event", "id", "comment", 2L);
+        SSEvent ssEventEmpty = new SSEvent("", "event", "id", "testing", 1L);
         String expectedForNull = "id:id\nevent:event\nretry:2\n:comment";
         String expectedForEmpty = 
"id:id\nevent:event\ndata:\nretry:1\n:testing";
 
@@ -75,9 +75,9 @@ public class SSEventTest {
 
     @Test
     public void testToStringNoRetry() {
-        SSEvent ssEventNull = new SSEvent("data", "event", "id", ":TEST", 
null);
+        SSEvent ssEventNull = new SSEvent("data", "event", "id", "TEST", null);
         String expected = "id:id\nevent:event\ndata:data\n:TEST";
 
         assertEquals(expected, ssEventNull.toString());
     }
-}
\ No newline at end of file
+}

Reply via email to