This is an automated email from the ASF dual-hosted git repository.
smolnar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git
The following commit(s) were added to refs/heads/master by this push:
new 416943fa6 KNOX-3146: Failover feature for SSE HA (#1049)
416943fa6 is described below
commit 416943fa62240921bee256e06070181b866d11e7
Author: hanicz <[email protected]>
AuthorDate: Tue Aug 19 16:12:30 2025 +0200
KNOX-3146: Failover feature for SSE HA (#1049)
---
gateway-provider-ha/pom.xml | 9 +
.../gateway/ha/config/CommonHaConfigurations.java | 127 ++++++
.../knox/gateway/ha/config/HaConfigurations.java | 61 +++
.../{LBHaDispatch.java => CommonHaDispatch.java} | 153 +++++--
.../ha/dispatch/ConfigurableHADispatch.java | 202 ++-------
.../knox/gateway/ha/dispatch/SSEHaCallback.java | 63 +++
.../knox/gateway/ha/dispatch/SSEHaDispatch.java | 123 +++---
.../StickySessionCookieRemovedRequest.java | 51 +++
.../ha/dispatch/i18n/HaDispatchMessages.java | 5 +
.../gateway/ha/dispatch/SSEHaDispatchTest.java | 451 +++++++++++++++++++--
.../hdfs/dispatch/AbstractHdfsHaDispatch.java | 12 +-
gateway-spi/pom.xml | 10 -
.../apache/knox/gateway/SpiGatewayMessages.java | 3 +
.../knox/gateway/dispatch/AsyncDispatch.java | 21 +
.../gateway/dispatch/ConfigurableDispatch.java | 2 +-
.../apache/knox/gateway/dispatch/SyncDispatch.java | 21 +
.../org/apache/knox/gateway/sse/SSECallback.java | 80 ++++
.../org/apache/knox/gateway/sse/SSEDispatch.java | 53 +--
.../apache/knox/gateway/sse/SSEDispatchTest.java | 2 +
pom.xml | 10 +
20 files changed, 1093 insertions(+), 366 deletions(-)
diff --git a/gateway-provider-ha/pom.xml b/gateway-provider-ha/pom.xml
index ebcb12739..e49ac5307 100644
--- a/gateway-provider-ha/pom.xml
+++ b/gateway-provider-ha/pom.xml
@@ -110,6 +110,15 @@
<artifactId>commons-lang3</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpasyncclient</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore-nio</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
diff --git
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/config/CommonHaConfigurations.java
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/config/CommonHaConfigurations.java
new file mode 100644
index 000000000..0a367c9d8
--- /dev/null
+++
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/config/CommonHaConfigurations.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.ha.config;
+
+import org.apache.knox.gateway.ha.provider.HaProvider;
+import org.apache.knox.gateway.ha.provider.impl.HaServiceConfigConstants;
+
+import java.util.Collections;
+import java.util.List;
+
+public class CommonHaConfigurations implements HaConfigurations {
+
+ private HaProvider haProvider;
+ private boolean loadBalancingEnabled =
HaServiceConfigConstants.DEFAULT_LOAD_BALANCING_ENABLED;
+ private boolean stickySessionsEnabled =
HaServiceConfigConstants.DEFAULT_STICKY_SESSIONS_ENABLED;
+ private String stickySessionCookieName =
HaServiceConfigConstants.DEFAULT_STICKY_SESSION_COOKIE_NAME;
+ private List<String> disableLoadBalancingForUserAgents =
Collections.singletonList(HaServiceConfigConstants.DEFAULT_DISABLE_LB_USER_AGENTS);
+ private int maxFailoverAttempts =
HaServiceConfigConstants.DEFAULT_MAX_FAILOVER_ATTEMPTS;
+ private int failoverSleep =
HaServiceConfigConstants.DEFAULT_FAILOVER_SLEEP;
+ private boolean failoverNonIdempotentRequestEnabled =
HaServiceConfigConstants.DEFAULT_FAILOVER_NON_IDEMPOTENT;
+ private boolean noFallbackEnabled =
HaServiceConfigConstants.DEFAULT_NO_FALLBACK_ENABLED;
+
+ @Override
+ public HaProvider getHaProvider() {
+ return haProvider;
+ }
+
+ @Override
+ public void setLoadBalancingEnabled(boolean enabled) {
+ this.loadBalancingEnabled = enabled;
+ }
+
+ @Override
+ public void setHaProvider(HaProvider haProvider) {
+ this.haProvider = haProvider;
+ }
+
+ @Override
+ public boolean isStickySessionEnabled() {
+ return stickySessionsEnabled;
+ }
+
+ @Override
+ public void setStickySessionsEnabled(boolean enabled) {
+ this.stickySessionsEnabled = enabled;
+ }
+
+ @Override
+ public String getStickySessionCookieName() {
+ return stickySessionCookieName;
+ }
+
+ @Override
+ public void setStickySessionCookieName(String stickySessionCookieName) {
+ this.stickySessionCookieName = stickySessionCookieName;
+ }
+
+ @Override
+ public boolean isLoadBalancingEnabled() {
+ return loadBalancingEnabled;
+ }
+
+ @Override
+ public List<String> getDisableLoadBalancingForUserAgents() {
+ return disableLoadBalancingForUserAgents;
+ }
+
+ @Override
+ public void setDisableLoadBalancingForUserAgents(List<String>
disableLoadBalancingForUserAgents) {
+ this.disableLoadBalancingForUserAgents =
disableLoadBalancingForUserAgents;
+ }
+
+ @Override
+ public int getMaxFailoverAttempts() {
+ return maxFailoverAttempts;
+ }
+
+ @Override
+ public void setMaxFailoverAttempts(int maxFailoverAttempts) {
+ this.maxFailoverAttempts = maxFailoverAttempts;
+ }
+
+ @Override
+ public int getFailoverSleep() {
+ return failoverSleep;
+ }
+
+ @Override
+ public void setFailoverSleep(int failoverSleep) {
+ this.failoverSleep = failoverSleep;
+ }
+
+ @Override
+ public void setFailoverNonIdempotentRequestEnabled(boolean enabled) {
+ this.failoverNonIdempotentRequestEnabled = enabled;
+ }
+
+ @Override
+ public boolean isFailoverNonIdempotentRequestEnabled() {
+ return failoverNonIdempotentRequestEnabled;
+ }
+
+ @Override
+ public void setNoFallbackEnabled(boolean enabled) {
+ this.noFallbackEnabled = enabled;
+ }
+
+ @Override
+ public boolean isNoFallbackEnabled() {
+ return noFallbackEnabled;
+ }
+}
diff --git
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/config/HaConfigurations.java
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/config/HaConfigurations.java
new file mode 100644
index 000000000..2e5cadcd9
--- /dev/null
+++
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/config/HaConfigurations.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.ha.config;
+
+import org.apache.knox.gateway.ha.provider.HaProvider;
+
+import java.util.List;
+
+public interface HaConfigurations {
+
+ HaProvider getHaProvider();
+
+ void setLoadBalancingEnabled(boolean enabled);
+
+ void setHaProvider(HaProvider haProvider);
+
+ boolean isStickySessionEnabled();
+
+ void setStickySessionsEnabled(boolean enabled);
+
+ String getStickySessionCookieName();
+
+ void setStickySessionCookieName(String stickySessionCookieName);
+
+ boolean isLoadBalancingEnabled();
+
+ List<String> getDisableLoadBalancingForUserAgents();
+
+ void setDisableLoadBalancingForUserAgents(List<String>
disableLoadBalancingForUserAgents);
+
+ int getMaxFailoverAttempts();
+
+ void setMaxFailoverAttempts(int maxFailoverAttempts);
+
+ int getFailoverSleep();
+
+ void setFailoverSleep(int failoverSleep);
+
+ void setFailoverNonIdempotentRequestEnabled(boolean enabled);
+
+ boolean isFailoverNonIdempotentRequestEnabled();
+
+ void setNoFallbackEnabled(boolean enabled);
+
+ boolean isNoFallbackEnabled();
+}
diff --git
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/LBHaDispatch.java
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/CommonHaDispatch.java
similarity index 54%
rename from
gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/LBHaDispatch.java
rename to
gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/CommonHaDispatch.java
index 433e9bfe5..2918c099e 100644
---
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/LBHaDispatch.java
+++
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/CommonHaDispatch.java
@@ -22,14 +22,16 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.utils.URIBuilder;
+import org.apache.knox.gateway.filter.AbstractGatewayFilter;
import org.apache.knox.gateway.ha.dispatch.i18n.HaDispatchMessages;
-import org.apache.knox.gateway.ha.provider.HaProvider;
+import org.apache.knox.gateway.ha.config.HaConfigurations;
import org.apache.knox.gateway.ha.provider.HaServiceConfig;
import org.apache.knox.gateway.i18n.messages.MessagesFactory;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
@@ -38,70 +40,65 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
-public interface LBHaDispatch {
+public interface CommonHaDispatch {
HaDispatchMessages LOG = MessagesFactory.get(HaDispatchMessages.class);
Map<String, String> urlToHashLookup = new HashMap<>();
Map<String, String> hashToUrlLookup = new HashMap<>();
-
- boolean isStickySessionEnabled();
-
- void setStickySessionsEnabled(boolean enabled);
-
- String getStickySessionCookieName();
-
- void setStickySessionCookieName(String stickySessionCookieName);
-
- HaProvider getHaProvider();
+ String FAILOVER_COUNTER_ATTRIBUTE = "dispatch.ha.failover.counter";
+ List<String> nonIdempotentRequests = Arrays.asList("POST", "PATCH",
"CONNECT");
String getServiceRole();
- void setLoadBalancingEnabled(boolean enabled);
-
- boolean isLoadBalancingEnabled();
+ URI getDispatchUrl(HttpServletRequest request);
- List<String> getDisableLoadBalancingForUserAgents();
-
- void setDisableLoadBalancingForUserAgents(List<String>
disableLoadBalancingForUserAgents);
+ HaConfigurations getHaConfigurations();
AtomicReference<String> getActiveURL();
void setActiveURL(String url);
- default void initializeLBHaDispatch(HaServiceConfig serviceConfig) {
- setLoadBalancingEnabled(serviceConfig.isLoadBalancingEnabled());
- setStickySessionsEnabled(isLoadBalancingEnabled() &&
serviceConfig.isStickySessionEnabled());
+ default void initializeCommonHaDispatch(HaServiceConfig serviceConfig) {
+
getHaConfigurations().setLoadBalancingEnabled(serviceConfig.isLoadBalancingEnabled());
+
getHaConfigurations().setStickySessionsEnabled(getHaConfigurations().isLoadBalancingEnabled()
&& serviceConfig.isStickySessionEnabled());
- if (isStickySessionEnabled()) {
-
setStickySessionCookieName(serviceConfig.getStickySessionCookieName());
+ if (getHaConfigurations().isStickySessionEnabled()) {
+
getHaConfigurations().setStickySessionCookieName(serviceConfig.getStickySessionCookieName());
}
if
(StringUtils.isNotBlank(serviceConfig.getStickySessionDisabledUserAgents())) {
-
setDisableLoadBalancingForUserAgents(Arrays.asList(serviceConfig.getStickySessionDisabledUserAgents()
+
getHaConfigurations().setDisableLoadBalancingForUserAgents(Arrays.asList(serviceConfig.getStickySessionDisabledUserAgents()
.trim()
.split("\\s*,\\s*")));
}
setupUrlHashLookup();
/* setup the active URL for non-LB case */
- setActiveURL(getHaProvider().getActiveURL(getServiceRole()));
+
setActiveURL(getHaConfigurations().getHaProvider().getActiveURL(getServiceRole()));
// Suffix the cookie name by the service to make it unique
// The cookie path is NOT unique since Knox is stripping the service
name.
- setStickySessionCookieName(getStickySessionCookieName() + '-' +
getServiceRole());
+
getHaConfigurations().setStickySessionCookieName(getHaConfigurations().getStickySessionCookieName()
+ '-' + getServiceRole());
+
+ // Set the failover parameters
+
getHaConfigurations().setMaxFailoverAttempts(serviceConfig.getMaxFailoverAttempts());
+
getHaConfigurations().setFailoverSleep(serviceConfig.getFailoverSleep());
+
getHaConfigurations().setFailoverNonIdempotentRequestEnabled(serviceConfig.isFailoverNonIdempotentRequestEnabled());
+
getHaConfigurations().setNoFallbackEnabled(getHaConfigurations().isStickySessionEnabled()
&& serviceConfig.isNoFallbackEnabled());
}
default void setKnoxHaCookie(final HttpUriRequest outboundRequest, final
HttpServletRequest inboundRequest,
final HttpServletResponse outboundResponse,
boolean sslEnabled) {
- if (isStickySessionEnabled()) {
+ if (getHaConfigurations().isStickySessionEnabled()) {
List<Cookie> serviceHaCookies = Collections.emptyList();
if (inboundRequest.getCookies() != null) {
serviceHaCookies = Arrays
.stream(inboundRequest.getCookies())
- .filter(cookie ->
getStickySessionCookieName().equals(cookie.getName()))
+ .filter(cookie ->
getHaConfigurations().getStickySessionCookieName().equals(cookie.getName()))
.collect(Collectors.toList());
}
@@ -114,12 +111,12 @@ public interface LBHaDispatch {
* we set cookie for the endpoint that was served and not rely
on haProvider.getActiveURL().
* let LBing logic take care of rotating urls.
**/
- final List<String> urls =
getHaProvider().getURLs(getServiceRole())
+ final List<String> urls =
getHaConfigurations().getHaProvider().getURLs(getServiceRole())
.stream()
.filter(u ->
u.contains(outboundRequest.getURI().getHost()))
.collect(Collectors.toList());
final String cookieValue = urlToHashLookup.get(urls.get(0));
- Cookie stickySessionCookie = new
Cookie(getStickySessionCookieName(), cookieValue);
+ Cookie stickySessionCookie = new
Cookie(getHaConfigurations().getStickySessionCookieName(), cookieValue);
stickySessionCookie.setPath(inboundRequest.getContextPath());
stickySessionCookie.setMaxAge(-1);
stickySessionCookie.setHttpOnly(true);
@@ -129,14 +126,14 @@ public interface LBHaDispatch {
}
}
- default Optional<URI> setBackendFromHaCookie(HttpUriRequest
outboundRequest, HttpServletRequest inboundRequest) {
- if (isLoadBalancingEnabled() && isStickySessionEnabled() &&
inboundRequest.getCookies() != null) {
+ default Optional<URI> getBackendFromHaCookie(HttpUriRequest
outboundRequest, HttpServletRequest inboundRequest) {
+ if (getHaConfigurations().isLoadBalancingEnabled() &&
getHaConfigurations().isStickySessionEnabled() && inboundRequest.getCookies()
!= null) {
for (Cookie cookie : inboundRequest.getCookies()) {
- if (getStickySessionCookieName().equals(cookie.getName())) {
+ if
(getHaConfigurations().getStickySessionCookieName().equals(cookie.getName())) {
String backendURLHash = cookie.getValue();
String backendURL = hashToUrlLookup.get(backendURLHash);
// Make sure that the url provided is actually a valid
backend url
- if
(getHaProvider().getURLs(getServiceRole()).contains(backendURL)) {
+ if
(getHaConfigurations().getHaProvider().getURLs(getServiceRole()).contains(backendURL))
{
try {
return
Optional.of(updateHostURL(outboundRequest.getURI(), backendURL));
} catch (URISyntaxException ignore) {
@@ -149,6 +146,7 @@ public interface LBHaDispatch {
return Optional.empty();
}
+
default String hash(String url) {
return DigestUtils.sha256Hex(url);
}
@@ -172,7 +170,7 @@ public interface LBHaDispatch {
}
default void setupUrlHashLookup() {
- for (String url : getHaProvider().getURLs(getServiceRole())) {
+ for (String url :
getHaConfigurations().getHaProvider().getURLs(getServiceRole())) {
String urlHash = hash(url);
urlToHashLookup.put(url, urlHash);
hashToUrlLookup.put(urlHash, url);
@@ -185,9 +183,9 @@ public interface LBHaDispatch {
boolean userAgentDisabled = false;
/* disable loadbalancing in case a configured user agent is detected
to disable LB */
- if
(getDisableLoadBalancingForUserAgents().stream().anyMatch(userAgentFromBrowser::contains))
{
+ if
(getHaConfigurations().getDisableLoadBalancingForUserAgents().stream().anyMatch(userAgentFromBrowser::contains))
{
userAgentDisabled = true;
- LOG.disableHALoadbalancinguserAgent(userAgentFromBrowser,
getDisableLoadBalancingForUserAgents().toString());
+ LOG.disableHALoadbalancinguserAgent(userAgentFromBrowser,
getHaConfigurations().getDisableLoadBalancingForUserAgents().toString());
}
return userAgentDisabled;
@@ -197,7 +195,7 @@ public interface LBHaDispatch {
/* if disable LB is set don't bother setting backend from cookie */
Optional<URI> backendURI = Optional.empty();
if (!userAgentDisabled) {
- backendURI = setBackendFromHaCookie(outboundRequest,
inboundRequest);
+ backendURI = getBackendFromHaCookie(outboundRequest,
inboundRequest);
backendURI.ifPresent(uri -> ((HttpRequestBase)
outboundRequest).setURI(uri));
}
@@ -206,7 +204,7 @@ public interface LBHaDispatch {
* and we have a HTTP request configured not to use LB
* use the activeURL
*/
- if (isLoadBalancingEnabled() && userAgentDisabled) {
+ if (getHaConfigurations().isLoadBalancingEnabled() &&
userAgentDisabled) {
try {
((HttpRequestBase)
outboundRequest).setURI(updateHostURL(outboundRequest.getURI(),
getActiveURL().get()));
} catch (final URISyntaxException e) {
@@ -225,19 +223,88 @@ public interface LBHaDispatch {
* needs to be loadbalanced. If a request has BACKEND coookie and
Loadbalance=on then
* there should be no loadbalancing.
*/
- if (isLoadBalancingEnabled() && !userAgentDisabled) {
+ if (getHaConfigurations().isLoadBalancingEnabled() &&
!userAgentDisabled) {
/* check sticky session enabled */
- if (isStickySessionEnabled()) {
+ if (getHaConfigurations().isStickySessionEnabled()) {
/* loadbalance only when sticky session enabled and no backend
url cookie */
if (!backendURI.isPresent()) {
-
getHaProvider().makeNextActiveURLAvailable(getServiceRole());
+
getHaConfigurations().getHaProvider().makeNextActiveURLAvailable(getServiceRole());
} else {
/* sticky session enabled and backend url cookie is valid
no need to loadbalance */
/* do nothing */
}
} else {
- getHaProvider().makeNextActiveURLAvailable(getServiceRole());
+
getHaConfigurations().getHaProvider().makeNextActiveURLAvailable(getServiceRole());
+ }
+ }
+ }
+
+ /**
+ * A helper method that marks an endpoint failed.
+ * Changes HA Provider state.
+ * Changes ActiveUrl state.
+ * Changes for inbound urls should be handled by calling functions.
+ *
+ * @param outboundRequest
+ * @param inboundRequest
+ * @return current failover counter
+ */
+ default AtomicInteger markEndpointFailed(final HttpUriRequest
outboundRequest, final HttpServletRequest inboundRequest) {
+ synchronized (this) {
+
getHaConfigurations().getHaProvider().markFailedURL(getServiceRole(),
outboundRequest.getURI().toString());
+ AtomicInteger counter = (AtomicInteger)
inboundRequest.getAttribute(FAILOVER_COUNTER_ATTRIBUTE);
+ if (counter == null) {
+ counter = new AtomicInteger(0);
+ }
+
+ if (counter.incrementAndGet() <=
getHaConfigurations().getMaxFailoverAttempts()) {
+ setupUrlHashLookup(); // refresh the url hash after failing a
url
+ /* in case of failover update the activeURL variable */
+ getActiveURL().set(outboundRequest.getURI().toString());
+ }
+ return counter;
+ }
+ }
+
+ default HttpServletRequest prepareForFailover(HttpUriRequest
outboundRequest, HttpServletRequest inboundRequest) {
+ //null out target url so that rewriters run again
+
inboundRequest.setAttribute(AbstractGatewayFilter.TARGET_REQUEST_URL_ATTRIBUTE_NAME,
null);
+ // Make sure to remove the ha cookie from the request
+ inboundRequest = new
StickySessionCookieRemovedRequest(getHaConfigurations().getStickySessionCookieName(),
inboundRequest);
+ URI uri = getDispatchUrl(inboundRequest);
+ ((HttpRequestBase) outboundRequest).setURI(uri);
+ if (getHaConfigurations().getFailoverSleep() > 0) {
+ try {
+ Thread.sleep(getHaConfigurations().getFailoverSleep());
+ } catch (InterruptedException e) {
+ LOG.failoverSleepFailed(getServiceRole(), e);
+ Thread.currentThread().interrupt();
}
}
+ LOG.failingOverRequest(outboundRequest.getURI().toString());
+ return inboundRequest;
+ }
+
+ default boolean disabledFailoverHandled(HttpServletRequest inboundRequest,
HttpServletResponse outboundResponse) throws IOException {
+ // Check whether the session cookie is present
+ Optional<Cookie> sessionCookie = Optional.empty();
+ if (inboundRequest.getCookies() != null) {
+ sessionCookie =
+ Arrays.stream(inboundRequest.getCookies())
+ .filter(cookie ->
getHaConfigurations().getStickySessionCookieName().equals(cookie.getName()))
+ .findFirst();
+ }
+
+ // Check for a case where no fallback is configured
+ if (getHaConfigurations().isStickySessionEnabled() &&
getHaConfigurations().isNoFallbackEnabled() && sessionCookie.isPresent()) {
+ LOG.noFallbackError();
+ outboundResponse.sendError(HttpServletResponse.SC_BAD_GATEWAY,
"Service connection error, HA failover disabled");
+ return true;
+ }
+ return false;
+ }
+
+ default boolean
isNonIdempotentAndNonIdempotentFailoverDisabled(HttpUriRequest outboundRequest)
{
+ return !getHaConfigurations().isFailoverNonIdempotentRequestEnabled()
&&
nonIdempotentRequests.stream().anyMatch(outboundRequest.getMethod()::equalsIgnoreCase);
}
}
diff --git
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/ConfigurableHADispatch.java
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/ConfigurableHADispatch.java
index 0cb6b35b0..40382dd43 100644
---
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/ConfigurableHADispatch.java
+++
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/ConfigurableHADispatch.java
@@ -18,27 +18,20 @@
package org.apache.knox.gateway.ha.dispatch;
import org.apache.http.HttpResponse;
-import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.knox.gateway.config.Configure;
import org.apache.knox.gateway.config.GatewayConfig;
import org.apache.knox.gateway.dispatch.ConfigurableDispatch;
-import org.apache.knox.gateway.filter.AbstractGatewayFilter;
import org.apache.knox.gateway.ha.dispatch.i18n.HaDispatchMessages;
+import org.apache.knox.gateway.ha.config.CommonHaConfigurations;
+import org.apache.knox.gateway.ha.config.HaConfigurations;
import org.apache.knox.gateway.ha.provider.HaProvider;
-import org.apache.knox.gateway.ha.provider.HaServiceConfig;
-import org.apache.knox.gateway.ha.provider.impl.HaServiceConfigConstants;
import org.apache.knox.gateway.i18n.messages.MessagesFactory;
-import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletRequestWrapper;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -49,106 +42,46 @@ import static
org.apache.knox.gateway.util.HttpUtils.isConnectionError;
* A configurable HA dispatch class that has a very basic failover mechanism
and
* configurable options of ConfigurableDispatch class.
*/
-public class ConfigurableHADispatch extends ConfigurableDispatch implements
LBHaDispatch {
-
- protected static final String FAILOVER_COUNTER_ATTRIBUTE =
"dispatch.ha.failover.counter";
+public class ConfigurableHADispatch extends ConfigurableDispatch implements
CommonHaDispatch {
protected static final HaDispatchMessages LOG =
MessagesFactory.get(HaDispatchMessages.class);
-
- protected int maxFailoverAttempts =
HaServiceConfigConstants.DEFAULT_MAX_FAILOVER_ATTEMPTS;
-
- protected int failoverSleep =
HaServiceConfigConstants.DEFAULT_FAILOVER_SLEEP;
-
- protected HaProvider haProvider;
-
- protected static final List<String> nonIdempotentRequests =
Arrays.asList("POST", "PATCH", "CONNECT");
-
- private boolean loadBalancingEnabled =
HaServiceConfigConstants.DEFAULT_LOAD_BALANCING_ENABLED;
- private boolean stickySessionsEnabled =
HaServiceConfigConstants.DEFAULT_STICKY_SESSIONS_ENABLED;
- private boolean noFallbackEnabled =
HaServiceConfigConstants.DEFAULT_NO_FALLBACK_ENABLED;
- protected boolean failoverNonIdempotentRequestEnabled =
HaServiceConfigConstants.DEFAULT_FAILOVER_NON_IDEMPOTENT;
- private String stickySessionCookieName =
HaServiceConfigConstants.DEFAULT_STICKY_SESSION_COOKIE_NAME;
- private List<String> disableLoadBalancingForUserAgents =
Arrays.asList(HaServiceConfigConstants.DEFAULT_DISABLE_LB_USER_AGENTS);
+ private final HaConfigurations haConfigurations = new
CommonHaConfigurations();
/**
- * This activeURL is used to track urls when LB is turned off for some
clients.
- * The problem we have with selectively turning off LB is that other clients
- * that use LB can change the state from under the current session where LB
is
- * turned off.
- * e.g.
- * ODBC Connection established where LB is off. JDBC connection is
established
- * next where LB is enabled. This changes the active URL under the existing
ODBC
- * connection which will be an issue.
- * This variable keeps track of non-LB'ed url and updated upon failover.
+ * This activeURL is used to track urls when LB is turned off for some
clients.
+ * The problem we have with selectively turning off LB is that other clients
+ * that use LB can change the state from under the current session where LB
is
+ * turned off.
+ * e.g.
+ * ODBC Connection established where LB is off. JDBC connection is
established
+ * next where LB is enabled. This changes the active URL under the existing
ODBC
+ * connection which will be an issue.
+ * This variable keeps track of non-LB'ed url and updated upon failover.
*/
- private AtomicReference<String> activeURL = new AtomicReference();
+ private final AtomicReference<String> activeURL = new AtomicReference<>();
+
@Override
public void init() {
super.init();
LOG.initializingForResourceRole(getServiceRole());
- if ( haProvider != null ) {
- HaServiceConfig serviceConfig =
haProvider.getHaDescriptor().getServiceConfig(getServiceRole());
- maxFailoverAttempts = serviceConfig.getMaxFailoverAttempts();
- failoverSleep = serviceConfig.getFailoverSleep();
- failoverNonIdempotentRequestEnabled =
serviceConfig.isFailoverNonIdempotentRequestEnabled();
- initializeLBHaDispatch(serviceConfig);
- noFallbackEnabled = stickySessionsEnabled &&
serviceConfig.isNoFallbackEnabled();
+ if (haConfigurations.getHaProvider() != null) {
+
initializeCommonHaDispatch(haConfigurations.getHaProvider().getHaDescriptor().getServiceConfig(getServiceRole()));
}
}
@Override
- public HaProvider getHaProvider() {
- return haProvider;
+ public HaConfigurations getHaConfigurations() {
+ return haConfigurations;
}
@Configure
public void setHaProvider(HaProvider haProvider) {
- this.haProvider = haProvider;
- }
-
- @Override
- public boolean isStickySessionEnabled() {
- return stickySessionsEnabled;
- }
-
- @Override
- public void setStickySessionsEnabled(boolean enabled) {
- this.stickySessionsEnabled = enabled;
- }
-
- @Override
- public String getStickySessionCookieName() {
- return stickySessionCookieName;
- }
-
- @Override
- public void setStickySessionCookieName(String stickySessionCookieName) {
- this.stickySessionCookieName = stickySessionCookieName;
- }
-
- @Override
- public boolean isLoadBalancingEnabled() {
- return loadBalancingEnabled;
- }
-
- @Override
- public void setLoadBalancingEnabled(boolean enabled) {
- this.loadBalancingEnabled = enabled;
- }
-
- @Override
- public List<String> getDisableLoadBalancingForUserAgents() {
- return disableLoadBalancingForUserAgents;
- }
-
- @Override
- public void setDisableLoadBalancingForUserAgents(List<String>
disableLoadBalancingForUserAgents) {
- this.disableLoadBalancingForUserAgents = disableLoadBalancingForUserAgents;
+ getHaConfigurations().setHaProvider(haProvider);
}
@Override
public AtomicReference<String> getActiveURL() {
- return activeURL;
+ return activeURL;
}
@Override
@@ -195,48 +128,19 @@ public class ConfigurableHADispatch extends
ConfigurableDispatch implements LBHa
}
}
- private boolean
isNonIdempotentAndNonIdempotentFailoverDisabled(HttpUriRequest outboundRequest)
{
- return !failoverNonIdempotentRequestEnabled &&
nonIdempotentRequests.stream().anyMatch(outboundRequest.getMethod()::equalsIgnoreCase);
- }
-
protected void failoverRequest(HttpUriRequest outboundRequest,
HttpServletRequest inboundRequest, HttpServletResponse outboundResponse,
HttpResponse inboundResponse, Exception exception) throws IOException {
- // Check whether the session cookie is present
- Optional<Cookie> sessionCookie = Optional.empty();
- if (inboundRequest.getCookies() != null) {
- sessionCookie =
- Arrays.stream(inboundRequest.getCookies())
- .filter(cookie ->
stickySessionCookieName.equals(cookie.getName()))
- .findFirst();
- }
-
- // Check for a case where no fallback is configured
- if(stickySessionsEnabled && noFallbackEnabled &&
sessionCookie.isPresent()) {
- LOG.noFallbackError();
- outboundResponse.sendError(HttpServletResponse.SC_BAD_GATEWAY, "Service
connection error, HA failover disabled");
+ if (disabledFailoverHandled(inboundRequest, outboundResponse)) {
return;
}
+
/* mark endpoint as failed */
final AtomicInteger counter = markEndpointFailed(outboundRequest,
inboundRequest);
inboundRequest.setAttribute(FAILOVER_COUNTER_ATTRIBUTE, counter);
- if ( counter.get() <= maxFailoverAttempts ) {
- //null out target url so that rewriters run again
-
inboundRequest.setAttribute(AbstractGatewayFilter.TARGET_REQUEST_URL_ATTRIBUTE_NAME,
null);
- // Make sure to remove the cookie ha cookie from the request
- inboundRequest = new
StickySessionCookieRemovedRequest(stickySessionCookieName, inboundRequest);
- URI uri = getDispatchUrl(inboundRequest);
- ((HttpRequestBase) outboundRequest).setURI(uri);
- if ( failoverSleep > 0 ) {
- try {
- Thread.sleep(failoverSleep);
- } catch ( InterruptedException e ) {
- LOG.failoverSleepFailed(getServiceRole(), e);
- Thread.currentThread().interrupt();
- }
- }
- LOG.failingOverRequest(outboundRequest.getURI().toString());
+ if ( counter.get() <= haConfigurations.getMaxFailoverAttempts() ) {
+ inboundRequest = prepareForFailover(outboundRequest, inboundRequest);
executeRequest(outboundRequest, inboundRequest, outboundResponse);
} else {
- LOG.maxFailoverAttemptsReached(maxFailoverAttempts, getServiceRole());
+
LOG.maxFailoverAttemptsReached(haConfigurations.getMaxFailoverAttempts(),
getServiceRole());
if ( inboundResponse != null ) {
writeOutboundResponse(outboundRequest, inboundRequest,
outboundResponse, inboundResponse);
} else {
@@ -244,58 +148,4 @@ public class ConfigurableHADispatch extends
ConfigurableDispatch implements LBHa
}
}
}
-
- /**
- * A helper method that marks an endpoint failed.
- * Changes HA Provider state.
- * Changes ActiveUrl state.
- * Changes for inbound urls should be handled by calling functions.
- * @param outboundRequest
- * @param inboundRequest
- * @return current failover counter
- */
- private synchronized AtomicInteger markEndpointFailed(final HttpUriRequest
outboundRequest, final HttpServletRequest inboundRequest) {
- haProvider.markFailedURL(getServiceRole(),
outboundRequest.getURI().toString());
- AtomicInteger counter = (AtomicInteger)
inboundRequest.getAttribute(FAILOVER_COUNTER_ATTRIBUTE);
- if ( counter == null ) {
- counter = new AtomicInteger(0);
- }
-
- if ( counter.incrementAndGet() <= maxFailoverAttempts ) {
- setupUrlHashLookup(); // refresh the url hash after failing a url
- /* in case of failover update the activeURL variable */
- activeURL.set(outboundRequest.getURI().toString());
- }
- return counter;
- }
-
- /**
- * Strips out the cookies by the cookie name provided
- */
- private static class StickySessionCookieRemovedRequest extends
HttpServletRequestWrapper {
- private final Cookie[] cookies;
-
- StickySessionCookieRemovedRequest(String cookieName, HttpServletRequest
request) {
- super(request);
- this.cookies = filterCookies(cookieName, request.getCookies());
- }
-
- private Cookie[] filterCookies(String cookieName, Cookie[] cookies) {
- if (super.getCookies() == null) {
- return null;
- }
- List<Cookie> cookiesInternal = new ArrayList<>();
- for (Cookie cookie : cookies) {
- if (!cookieName.equals(cookie.getName())) {
- cookiesInternal.add(cookie);
- }
- }
- return cookiesInternal.toArray(new Cookie[0]);
- }
-
- @Override
- public Cookie[] getCookies() {
- return cookies;
- }
- }
}
diff --git
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/SSEHaCallback.java
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/SSEHaCallback.java
new file mode 100644
index 000000000..613e19941
--- /dev/null
+++
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/SSEHaCallback.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.ha.dispatch;
+
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.knox.gateway.sse.SSECallback;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import static org.apache.knox.gateway.util.HttpUtils.isConnectionError;
+
+public class SSEHaCallback extends SSECallback {
+
+ private final SSEHaDispatch dispatch;
+ private final HttpUriRequest outboundRequest;
+ private final HttpServletRequest inboundRequest;
+
+ public SSEHaCallback(HttpServletResponse outboundResponse, AsyncContext
asyncContext, HttpAsyncRequestProducer producer,
+ SSEHaDispatch dispatch, HttpUriRequest
outboundRequest, HttpServletRequest inboundRequest) {
+ super(outboundResponse, asyncContext, producer);
+ this.dispatch = dispatch;
+ this.outboundRequest = outboundRequest;
+ this.inboundRequest = inboundRequest;
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ if(outboundResponse.isCommitted()) {
+ /*
+ If the response was already committed, it means the connection
was closed abruptly no failover needed.
+ The endpoint shouldn't be marked as failed.
+ */
+ super.failed(ex);
+ } else {
+ if(!isConnectionError(ex.getCause() == null ? ex : ex.getCause())
&& dispatch.isNonIdempotentAndNonIdempotentFailoverDisabled(outboundRequest)) {
+ dispatch.markEndpointFailed(outboundRequest, inboundRequest);
+ super.failed(ex);
+ } else {
+ releaseResources(false);
+ LOG.sseConnectionError(ex.getMessage());
+ dispatch.failoverRequest(outboundRequest, outboundResponse,
inboundRequest, asyncContext);
+ }
+ }
+ }
+}
diff --git
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatch.java
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatch.java
index 50582c2b9..136a1721f 100644
---
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatch.java
+++
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatch.java
@@ -18,33 +18,36 @@
package org.apache.knox.gateway.ha.dispatch;
import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.nio.client.methods.AsyncCharConsumer;
+import org.apache.http.nio.client.methods.HttpAsyncMethods;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.knox.gateway.audit.api.Action;
+import org.apache.knox.gateway.audit.api.ActionOutcome;
+import org.apache.knox.gateway.audit.api.ResourceType;
import org.apache.knox.gateway.config.Configure;
import org.apache.knox.gateway.config.GatewayConfig;
import org.apache.knox.gateway.ha.dispatch.i18n.HaDispatchMessages;
+import org.apache.knox.gateway.ha.config.CommonHaConfigurations;
+import org.apache.knox.gateway.ha.config.HaConfigurations;
import org.apache.knox.gateway.ha.provider.HaProvider;
-import org.apache.knox.gateway.ha.provider.HaServiceConfig;
-import org.apache.knox.gateway.ha.provider.impl.HaServiceConfigConstants;
import org.apache.knox.gateway.i18n.messages.MessagesFactory;
import org.apache.knox.gateway.sse.SSEDispatch;
+import org.apache.knox.gateway.sse.SSEResponse;
+import javax.servlet.AsyncContext;
import javax.servlet.FilterConfig;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import java.net.URI;
-import java.util.Collections;
-import java.util.List;
+import java.io.IOException;
import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-public class SSEHaDispatch extends SSEDispatch implements LBHaDispatch {
+public class SSEHaDispatch extends SSEDispatch implements CommonHaDispatch {
protected static final HaDispatchMessages LOG =
MessagesFactory.get(HaDispatchMessages.class);
- protected HaProvider haProvider;
- private boolean loadBalancingEnabled =
HaServiceConfigConstants.DEFAULT_LOAD_BALANCING_ENABLED;
- private boolean stickySessionsEnabled =
HaServiceConfigConstants.DEFAULT_STICKY_SESSIONS_ENABLED;
- private String stickySessionCookieName =
HaServiceConfigConstants.DEFAULT_STICKY_SESSION_COOKIE_NAME;
- private List<String> disableLoadBalancingForUserAgents =
Collections.singletonList(HaServiceConfigConstants.DEFAULT_DISABLE_LB_USER_AGENTS);
private final boolean sslEnabled;
+ private final HaConfigurations haConfigurations = new
CommonHaConfigurations();
/**
* This activeURL is used to track urls when LB is turned off for some
clients.
@@ -70,60 +73,19 @@ public class SSEHaDispatch extends SSEDispatch implements
LBHaDispatch {
public void init() {
super.init();
LOG.initializingForResourceRole(getServiceRole());
- if (haProvider != null) {
- HaServiceConfig serviceConfig =
haProvider.getHaDescriptor().getServiceConfig(getServiceRole());
- this.initializeLBHaDispatch(serviceConfig);
+ if (haConfigurations.getHaProvider() != null) {
+
initializeCommonHaDispatch(haConfigurations.getHaProvider().getHaDescriptor().getServiceConfig(getServiceRole()));
}
}
- @Override
- public HaProvider getHaProvider() {
- return haProvider;
- }
-
- @Override
- public void setLoadBalancingEnabled(boolean enabled) {
- this.loadBalancingEnabled = enabled;
- }
-
@Configure
public void setHaProvider(HaProvider haProvider) {
- this.haProvider = haProvider;
- }
-
- @Override
- public boolean isStickySessionEnabled() {
- return stickySessionsEnabled;
- }
-
- @Override
- public void setStickySessionsEnabled(boolean enabled) {
- this.stickySessionsEnabled = enabled;
+ getHaConfigurations().setHaProvider(haProvider);
}
@Override
- public String getStickySessionCookieName() {
- return stickySessionCookieName;
- }
-
- @Override
- public void setStickySessionCookieName(String stickySessionCookieName) {
- this.stickySessionCookieName = stickySessionCookieName;
- }
-
- @Override
- public boolean isLoadBalancingEnabled() {
- return loadBalancingEnabled;
- }
-
- @Override
- public List<String> getDisableLoadBalancingForUserAgents() {
- return disableLoadBalancingForUserAgents;
- }
-
- @Override
- public void setDisableLoadBalancingForUserAgents(List<String>
disableLoadBalancingForUserAgents) {
- this.disableLoadBalancingForUserAgents =
disableLoadBalancingForUserAgents;
+ public HaConfigurations getHaConfigurations() {
+ return haConfigurations;
}
@Override
@@ -136,17 +98,60 @@ public class SSEHaDispatch extends SSEDispatch implements
LBHaDispatch {
activeURL.set(url);
}
+ @Override
+ protected void executeAsyncRequest(HttpUriRequest outboundRequest,
HttpServletResponse outboundResponse,
+ AsyncContext asyncContext,
HttpServletRequest inboundRequest) {
+ HttpAsyncRequestProducer producer =
HttpAsyncMethods.create(outboundRequest);
+ AsyncCharConsumer<SSEResponse> consumer = new
SSECharConsumer(outboundResponse, outboundRequest.getURI(), asyncContext,
inboundRequest, outboundRequest);
+ LOG.dispatchRequest(outboundRequest.getMethod(),
outboundRequest.getURI());
+ auditor.audit(Action.DISPATCH, outboundRequest.getURI().toString(),
ResourceType.URI, ActionOutcome.UNAVAILABLE,
RES.requestMethod(outboundRequest.getMethod()));
+ asyncClient.execute(producer, consumer, new
SSEHaCallback(outboundResponse, asyncContext, producer, this, outboundRequest,
inboundRequest));
+ }
+
+ protected void failoverRequest(HttpUriRequest outboundRequest,
HttpServletResponse outboundResponse,
+ HttpServletRequest inboundRequest,
AsyncContext asyncContext) {
+
+ try {
+ if (disabledFailoverHandled(inboundRequest, outboundResponse)) {
+ asyncContext.complete();
+ return;
+ }
+
+ /* mark endpoint as failed */
+ final AtomicInteger counter = markEndpointFailed(outboundRequest,
inboundRequest);
+ inboundRequest.setAttribute(FAILOVER_COUNTER_ATTRIBUTE, counter);
+ if (counter.get() <= haConfigurations.getMaxFailoverAttempts()) {
+ inboundRequest = prepareForFailover(outboundRequest,
inboundRequest);
+ executeAsyncRequest(outboundRequest, outboundResponse,
asyncContext, inboundRequest);
+ } else {
+
LOG.maxFailoverAttemptsReached(haConfigurations.getMaxFailoverAttempts(),
getServiceRole());
+ outboundResponse.sendError(HttpServletResponse.SC_BAD_GATEWAY,
"Service connection error, max failover attempts reached");
+ asyncContext.complete();
+ }
+ } catch (IOException e) {
+ asyncContext.complete();
+ }
+ }
@Override
protected void executeRequestWrapper(HttpUriRequest outboundRequest,
HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) {
boolean userAgentDisabled = isUserAgentDisabled(inboundRequest);
- Optional<URI> backendURI = setBackendUri(outboundRequest,
inboundRequest, userAgentDisabled);
+ setBackendUri(outboundRequest, inboundRequest, userAgentDisabled);
executeRequest(outboundRequest, inboundRequest, outboundResponse);
- shiftActiveURL(userAgentDisabled, backendURI);
}
@Override
protected void outboundResponseWrapper(final HttpUriRequest
outboundRequest, final HttpServletRequest inboundRequest, final
HttpServletResponse outboundResponse) {
setKnoxHaCookie(outboundRequest, inboundRequest, outboundResponse,
sslEnabled);
}
+
+ @Override
+ protected void shiftCallback(HttpUriRequest outboundRequest,
HttpServletRequest inboundRequest) {
+ /*
+ Due to the async behavior shifting has to take place after a
successful response-received event
+ and not in the executeRequest method. This is the same as in a
sync dispatch.
+ */
+ boolean userAgentDisabled = isUserAgentDisabled(inboundRequest);
+ shiftActiveURL(userAgentDisabled, userAgentDisabled ? Optional.empty()
: getBackendFromHaCookie(outboundRequest, inboundRequest));
+ }
}
diff --git
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/StickySessionCookieRemovedRequest.java
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/StickySessionCookieRemovedRequest.java
new file mode 100644
index 000000000..1a0991519
--- /dev/null
+++
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/StickySessionCookieRemovedRequest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.ha.dispatch;
+
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletRequestWrapper;
+import java.util.ArrayList;
+import java.util.List;
+
+public class StickySessionCookieRemovedRequest extends
HttpServletRequestWrapper {
+ private final Cookie[] cookies;
+
+ StickySessionCookieRemovedRequest(String cookieName, HttpServletRequest
request) {
+ super(request);
+ this.cookies = filterCookies(cookieName, request.getCookies());
+ }
+
+ private Cookie[] filterCookies(String cookieName, Cookie[] cookies) {
+ if (super.getCookies() == null) {
+ return null;
+ }
+ List<Cookie> cookiesInternal = new ArrayList<>();
+ for (Cookie cookie : cookies) {
+ if (!cookieName.equals(cookie.getName())) {
+ cookiesInternal.add(cookie);
+ }
+ }
+ return cookiesInternal.toArray(new Cookie[0]);
+ }
+
+ @Override
+ public Cookie[] getCookies() {
+ return cookies;
+ }
+}
diff --git
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/i18n/HaDispatchMessages.java
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/i18n/HaDispatchMessages.java
index c3a855c6d..201632321 100644
---
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/i18n/HaDispatchMessages.java
+++
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/i18n/HaDispatchMessages.java
@@ -22,6 +22,8 @@ import org.apache.knox.gateway.i18n.messages.MessageLevel;
import org.apache.knox.gateway.i18n.messages.Messages;
import org.apache.knox.gateway.i18n.messages.StackTrace;
+import java.net.URI;
+
@Messages(logger = "org.apache.knox.gateway")
public interface HaDispatchMessages {
@Message(level = MessageLevel.INFO, text = "Initializing Ha Dispatch for:
{0}")
@@ -56,4 +58,7 @@ public interface HaDispatchMessages {
@Message(level = MessageLevel.ERROR, text = "Request is non-idempotent {0},
failover prevented, to allow non-idempotent requests to failover set
'failoverNonIdempotentRequestEnabled=true' in HA config. Non connection related
error: {1}")
void cannotFailoverNonIdempotentRequest(String method, Throwable cause);
+
+ @Message( level = MessageLevel.DEBUG, text = "Dispatch request: {0} {1}" )
+ void dispatchRequest( String method, URI uri );
}
diff --git
a/gateway-provider-ha/src/test/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatchTest.java
b/gateway-provider-ha/src/test/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatchTest.java
index 77c34f559..d5cc9dadd 100644
---
a/gateway-provider-ha/src/test/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatchTest.java
+++
b/gateway-provider-ha/src/test/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatchTest.java
@@ -22,6 +22,7 @@ import org.apache.http.client.config.CookieSpecs;
import org.apache.knox.gateway.config.GatewayConfig;
import org.apache.knox.gateway.ha.provider.HaDescriptor;
import org.apache.knox.gateway.ha.provider.HaProvider;
+import org.apache.knox.gateway.ha.provider.HaServiceConfig;
import org.apache.knox.gateway.ha.provider.impl.DefaultHaProvider;
import org.apache.knox.gateway.ha.provider.impl.HaDescriptorFactory;
import org.apache.knox.gateway.services.GatewayServices;
@@ -32,8 +33,9 @@ import org.apache.knox.test.mock.MockServletContext;
import org.apache.knox.test.mock.MockServletInputStream;
import org.easymock.Capture;
import org.easymock.EasyMock;
+import org.junit.After;
import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
import org.junit.Test;
import javax.servlet.AsyncContext;
@@ -53,30 +55,40 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class SSEHaDispatchTest {
- private static MockServer MOCK_SSE_SERVER;
- private static URI URL;
+ private MockServer MOCK_SSE_SERVER;
+ private URI URL;
+ private final String serviceName = "SSE";
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
+ @Before
+ public void setUpBeforeClass() throws Exception {
MOCK_SSE_SERVER = new MockServer("SSE", true);
URL = new URI("http://localhost:" + MOCK_SSE_SERVER.getPort() +
"/sse");
}
+ @After
+ public void cleanUp() throws Exception {
+ if (MOCK_SSE_SERVER != null) {
+ MOCK_SSE_SERVER.stop();
+ }
+ }
+
@Test
public void testHADispatchURL() throws Exception {
String serviceName = "SSE";
HaDescriptor descriptor = HaDescriptorFactory.createDescriptor();
-
descriptor.addServiceConfig(HaDescriptorFactory.createServiceConfig(serviceName,
"true", null, null, null, null, "true", "true", null, null, null, null));
+
descriptor.addServiceConfig(HaDescriptorFactory.createServiceConfig(serviceName,
"true", "1", "1000", null, null, "true", "true", null, null, null, null));
HaProvider provider = new DefaultHaProvider(descriptor);
URI uri1 = new URI("http://host1.valid");
URI uri2 = new URI("http://host2.valid");
@@ -130,7 +142,8 @@ public class SSEHaDispatchTest {
@Test
public void testStickySessionCookie() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
- SSEHaDispatch sseHaDispatch = this.createDispatch(null);
+ HaServiceConfig haServiceConfig =
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null,
null, "true", "true", null, null, "agentX,user1,agentY", null);
+ SSEHaDispatch sseHaDispatch = this.createDispatch(false,
haServiceConfig);
PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_OK);
AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
@@ -161,7 +174,8 @@ public class SSEHaDispatchTest {
@Test
public void testNamedStickySessionCookie() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
- SSEHaDispatch sseHaDispatch = this.createDispatch("COOKIE_NAME");
+ HaServiceConfig haServiceConfig =
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null,
null, "true", "true", "COOKIE_NAME", null, "agentX,user1,agentY", null);
+ SSEHaDispatch sseHaDispatch = this.createDispatch(false,
haServiceConfig);
PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_OK);
AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
@@ -192,7 +206,8 @@ public class SSEHaDispatchTest {
@Test
public void testLoadBalancingWithoutStickySessionCookie() throws Exception
{
CountDownLatch latch = new CountDownLatch(1);
- SSEHaDispatch sseHaDispatch = this.createDispatch(null);
+ HaServiceConfig haServiceConfig =
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null,
null, "true", "true", null, null, "agentX,user1,agentY", null);
+ SSEHaDispatch sseHaDispatch = this.createDispatch(false,
haServiceConfig);
PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_OK);
AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
@@ -217,13 +232,14 @@ public class SSEHaDispatchTest {
latch.await(1L, TimeUnit.SECONDS);
EasyMock.verify(asyncContext, outboundResponse, inboundRequest,
printWriter);
assertTrue(MOCK_SSE_SERVER.isEmpty());
- assertEquals("http://host2:3333/sse",
sseHaDispatch.getHaProvider().getActiveURL("SSE"));
+ assertEquals("http://host2.valid:3333/sse",
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
}
@Test
public void testLoadBalancingDisabledWithUserAgent() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
- SSEHaDispatch sseHaDispatch = this.createDispatch(null);
+ HaServiceConfig haServiceConfig =
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null,
null, "true", "true", null, null, "agentX,user1,agentY", null);
+ SSEHaDispatch sseHaDispatch = this.createDispatch(false,
haServiceConfig);
PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_OK);
AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
@@ -249,13 +265,14 @@ public class SSEHaDispatchTest {
latch.await(1L, TimeUnit.SECONDS);
EasyMock.verify(asyncContext, outboundResponse, inboundRequest,
printWriter);
assertTrue(MOCK_SSE_SERVER.isEmpty());
- assertEquals("http://localhost:" + MOCK_SSE_SERVER.getPort() + "/sse",
sseHaDispatch.getHaProvider().getActiveURL("SSE"));
+ assertEquals("http://localhost:" + MOCK_SSE_SERVER.getPort() + "/sse",
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
}
@Test
public void testLoadBalancingDisabledWithStickySession() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
- SSEHaDispatch sseHaDispatch = this.createDispatch(null);
+ HaServiceConfig haServiceConfig =
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null,
null, "true", "true", null, null, "agentX,user1,agentY", null);
+ SSEHaDispatch sseHaDispatch = this.createDispatch(false,
haServiceConfig);
PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_OK);
AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
@@ -277,7 +294,7 @@ public class SSEHaDispatchTest {
sseHaDispatch.doGet(URL, inboundRequest, outboundResponse);
latch.await(1L, TimeUnit.SECONDS);
- assertEquals("http://host2:3333/sse",
sseHaDispatch.getHaProvider().getActiveURL("SSE"));
+ assertEquals("http://host2.valid:3333/sse",
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
//Second request, sticky session cookie included
@@ -308,10 +325,357 @@ public class SSEHaDispatchTest {
EasyMock.verify(asyncContext, outboundResponse, inboundRequest,
printWriter, inboundRequest2);
assertTrue(MOCK_SSE_SERVER.isEmpty());
- assertEquals("http://host2:3333/sse",
sseHaDispatch.getHaProvider().getActiveURL("SSE"));
+ assertEquals("http://host2.valid:3333/sse",
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
+ }
+
+ @Test
+ public void testConnectivityFailover() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ HaServiceConfig haServiceConfig =
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null,
null, "true", "true", null, null, "agentX,user1,agentY", null);
+ SSEHaDispatch sseHaDispatch = this.createDispatch(true,
haServiceConfig);
+ PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+ HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_OK);
+ AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
+ HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+ Capture<Cookie> capturedArgument = Capture.newInstance();
+
expect(inboundRequest.getHeader("User-Agent")).andReturn("unknown").anyTimes();
+ expect(inboundRequest.getRequestURL()).andReturn(new
StringBuffer(URL.toString())).once();
+
expect(inboundRequest.getAttribute("dispatch.ha.failover.counter")).andReturn(new
AtomicInteger(0)).once();
+ this.expectResponseBodyAndHeader(printWriter, outboundResponse,
capturedArgument);
+ replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+ MOCK_SSE_SERVER.expect()
+ .method("GET")
+ .pathInfo("/sse")
+ .header("Accept", "text/event-stream")
+ .respond()
+ .status(HttpStatus.SC_OK)
+
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
StandardCharsets.UTF_8)
+ .header("response", "header")
+ .contentType("text/event-stream");
+ sseHaDispatch.doGet(new URI("http://unknown-host.invalid"),
inboundRequest, outboundResponse);
+
+ latch.await(2L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest,
printWriter);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ assertEquals("http://host2.valid:3333/sse",
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
}
- private SSEHaDispatch createDispatch(String cookieName) throws Exception {
+ @Test
+ public void testNoLoadBalancingStickyFailoverNoFallback() throws Exception
{
+ CountDownLatch latch = new CountDownLatch(1);
+ HaServiceConfig haServiceConfig =
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null,
null, "false", "true", null, "true", null, null);
+ SSEHaDispatch sseHaDispatch = this.createDispatch(true,
haServiceConfig);
+ PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+ HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_OK);
+ AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
+ HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+
expect(inboundRequest.getHeader("User-Agent")).andReturn("unknown").anyTimes();
+ expect(inboundRequest.getRequestURL()).andReturn(new
StringBuffer(URL.toString())).once();
+
expect(inboundRequest.getAttribute("dispatch.ha.failover.counter")).andReturn(new
AtomicInteger(0)).once();
+ this.expectResponseBodyAndHeader(printWriter, outboundResponse, null);
+ replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+ MOCK_SSE_SERVER.expect()
+ .method("GET")
+ .pathInfo("/sse")
+ .header("Accept", "text/event-stream")
+ .respond()
+ .status(HttpStatus.SC_OK)
+
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
StandardCharsets.UTF_8)
+ .header("response", "header")
+ .contentType("text/event-stream");
+ sseHaDispatch.doGet(new URI("http://unknown-host.invalid"),
inboundRequest, outboundResponse);
+
+ latch.await(2L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest,
printWriter);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ assertEquals(URL.toString(),
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
+ }
+
+ @Test
+ public void testNoFallbackWhenStickyDisabled() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ HaServiceConfig haServiceConfig =
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null,
null, "false", "false", null, "true", null, null);
+ SSEHaDispatch sseHaDispatch = this.createDispatch(true,
haServiceConfig);
+ PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+ HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_OK);
+ AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
+ HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+
expect(inboundRequest.getHeader("User-Agent")).andReturn("unknown").anyTimes();
+ expect(inboundRequest.getRequestURL()).andReturn(new
StringBuffer(URL.toString())).once();
+
expect(inboundRequest.getAttribute("dispatch.ha.failover.counter")).andReturn(new
AtomicInteger(0)).once();
+ this.expectResponseBodyAndHeader(printWriter, outboundResponse, null);
+ replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+ MOCK_SSE_SERVER.expect()
+ .method("GET")
+ .pathInfo("/sse")
+ .header("Accept", "text/event-stream")
+ .respond()
+ .status(HttpStatus.SC_OK)
+
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
StandardCharsets.UTF_8)
+ .header("response", "header")
+ .contentType("text/event-stream");
+ sseHaDispatch.doGet(new URI("http://unknown-host.invalid"),
inboundRequest, outboundResponse);
+
+ latch.await(2L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest,
printWriter);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ assertEquals(URL.toString(),
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
+ }
+
+ @Test
+ public void testMaxFailoverLimitReached() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ HaServiceConfig haServiceConfig =
HaDescriptorFactory.createServiceConfig(serviceName, "true", "0", "1000", null,
null, "true", "true", null, null, "agentX,user1,agentY", null);
+ SSEHaDispatch sseHaDispatch = this.createDispatch(true,
haServiceConfig);
+ HttpServletResponse outboundResponse = this.getServletResponse(0);
+ AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
+ HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+
expect(inboundRequest.getHeader("User-Agent")).andReturn("unknown").anyTimes();
+
expect(inboundRequest.getAttribute("dispatch.ha.failover.counter")).andReturn(new
AtomicInteger(0)).once();
+ outboundResponse.sendError(HttpServletResponse.SC_BAD_GATEWAY,
"Service connection error, max failover attempts reached");
+ EasyMock.expectLastCall().once();
+ this.expectResponseBodyAndHeader(null, outboundResponse, null);
+ replay(inboundRequest, asyncContext, outboundResponse);
+
+ MOCK_SSE_SERVER.expect()
+ .method("GET")
+ .pathInfo("/sse")
+ .header("Accept", "text/event-stream")
+ .respond()
+ .status(HttpStatus.SC_OK)
+
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
StandardCharsets.UTF_8)
+ .header("response", "header")
+ .contentType("text/event-stream");
+ sseHaDispatch.doGet(new URI("http://unknown-host.invalid"),
inboundRequest, outboundResponse);
+
+ latch.await(2L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+ assertFalse(MOCK_SSE_SERVER.isEmpty());
+ assertEquals(URL.toString(),
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
+ }
+
+ @Test
+ public void testGETFailoverIdempotentDisabled() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ HaServiceConfig haServiceConfig =
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null,
null, "true", "true", null, null, "agentX,user1,agentY", "false");
+ SSEHaDispatch sseHaDispatch = this.createDispatch(true,
haServiceConfig);
+ PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+ HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_OK);
+ AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
+ HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+ Capture<Cookie> capturedArgument = Capture.newInstance();
+
expect(inboundRequest.getHeader("User-Agent")).andReturn("unknown").anyTimes();
+ expect(inboundRequest.getRequestURL()).andReturn(new
StringBuffer(URL.toString())).once();
+
expect(inboundRequest.getAttribute("dispatch.ha.failover.counter")).andReturn(new
AtomicInteger(0)).once();
+ this.expectResponseBodyAndHeader(printWriter, outboundResponse,
capturedArgument);
+ replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+ MOCK_SSE_SERVER.expect()
+ .method("GET")
+ .pathInfo("/sse")
+ .header("Accept", "text/event-stream")
+ .respond()
+ .status(HttpStatus.SC_OK)
+
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
StandardCharsets.UTF_8)
+ .header("response", "header")
+ .contentType("text/event-stream");
+ sseHaDispatch.doGet(new URI("http://unknown-host.invalid"),
inboundRequest, outboundResponse);
+
+ latch.await(2L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest,
printWriter);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ assertEquals("http://host2.valid:3333/sse",
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
+ }
+
+ @Test
+ public void testPOSTFailoverIdempotentEnabled() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ HaServiceConfig haServiceConfig =
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null,
null, "true", "true", null, null, "agentX,user1,agentY", "true");
+ SSEHaDispatch sseHaDispatch = this.createDispatch(true,
haServiceConfig);
+ PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+ HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_OK);
+ AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
+ HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+ Capture<Cookie> capturedArgument = Capture.newInstance();
+
expect(inboundRequest.getHeader("User-Agent")).andReturn("unknown").anyTimes();
+ expect(inboundRequest.getRequestURL()).andReturn(new
StringBuffer(URL.toString())).once();
+
expect(inboundRequest.getAttribute("dispatch.ha.failover.counter")).andReturn(new
AtomicInteger(0)).once();
+ this.expectResponseBodyAndHeader(printWriter, outboundResponse,
capturedArgument);
+ replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+ MOCK_SSE_SERVER.expect()
+ .method("POST")
+ .pathInfo("/sse")
+ .header("Accept", "text/event-stream")
+ .respond()
+ .status(HttpStatus.SC_OK)
+
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
StandardCharsets.UTF_8)
+ .header("response", "header")
+ .contentType("text/event-stream");
+ sseHaDispatch.doPost(new URI("http://unknown-host.invalid"),
inboundRequest, outboundResponse);
+
+ latch.await(2L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest,
printWriter);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ assertEquals("http://host2.valid:3333/sse",
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
+ }
+
+ @Test
+ public void testPOSTConnectErrorFailoverIdempotentDisabled() throws
Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ HaServiceConfig haServiceConfig =
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null,
null, "true", "true", null, null, "agentX,user1,agentY", "false");
+ SSEHaDispatch sseHaDispatch = this.createDispatch(true,
haServiceConfig);
+ PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+ HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_OK);
+ AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
+ HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+ Capture<Cookie> capturedArgument = Capture.newInstance();
+
expect(inboundRequest.getHeader("User-Agent")).andReturn("unknown").anyTimes();
+ expect(inboundRequest.getRequestURL()).andReturn(new
StringBuffer(URL.toString())).once();
+
expect(inboundRequest.getAttribute("dispatch.ha.failover.counter")).andReturn(new
AtomicInteger(0)).once();
+ this.expectResponseBodyAndHeader(printWriter, outboundResponse,
capturedArgument);
+ replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+ MOCK_SSE_SERVER.expect()
+ .method("POST")
+ .pathInfo("/sse")
+ .header("Accept", "text/event-stream")
+ .respond()
+ .status(HttpStatus.SC_OK)
+
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
StandardCharsets.UTF_8)
+ .header("response", "header")
+ .contentType("text/event-stream");
+ sseHaDispatch.doPost(new URI("http://unknown-host.invalid"),
inboundRequest, outboundResponse);
+
+ latch.await(2L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest,
printWriter);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ assertEquals("http://host2.valid:3333/sse",
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
+ }
+
+ @Test
+ public void testFailoverDisabledWithStickySession() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ HaServiceConfig haServiceConfig =
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null,
null, "true", "true", null, null, "agentX,user1,agentY", null);
+ SSEHaDispatch sseHaDispatch = this.createDispatch(true,
haServiceConfig);
+ PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+ HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_OK);
+ AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
+ HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+ Capture<Cookie> capturedArgument = Capture.newInstance();
+
expect(inboundRequest.getHeader("User-Agent")).andReturn("unknown").anyTimes();
+ expect(inboundRequest.getRequestURL()).andReturn(new
StringBuffer(URL.toString())).once();
+
expect(inboundRequest.getAttribute("dispatch.ha.failover.counter")).andReturn(new
AtomicInteger(0)).once();
+ this.expectResponseBodyAndHeader(printWriter, outboundResponse,
capturedArgument);
+ replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+ MOCK_SSE_SERVER.expect()
+ .method("GET")
+ .pathInfo("/sse")
+ .header("Accept", "text/event-stream")
+ .respond()
+ .status(HttpStatus.SC_OK)
+
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
StandardCharsets.UTF_8)
+ .header("response", "header")
+ .contentType("text/event-stream");
+ sseHaDispatch.doGet(new URI("http://unknown-host.invalid"),
inboundRequest, outboundResponse);
+
+ latch.await(2L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest,
printWriter);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ assertEquals("http://host2.valid:3333/sse",
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
+
+ //Second request, sticky session cookie included
+ MOCK_SSE_SERVER.stop();
+ CountDownLatch latch2 = new CountDownLatch(1);
+ Cookie[] cookies = new Cookie[1];
+ cookies[0] = new Cookie(capturedArgument.getValue().getName(),
capturedArgument.getValue().getValue());
+ PrintWriter printWriter2 = EasyMock.createNiceMock(PrintWriter.class);
+ HttpServletResponse outboundResponse2 =
this.getServletResponse(HttpStatus.SC_OK);
+ AsyncContext asyncContext2 = this.getAsyncContext(latch2,
outboundResponse2);
+ Capture<Cookie> capturedArgument2 = Capture.newInstance();
+ this.expectResponseBodyAndHeader(printWriter2, outboundResponse2,
capturedArgument2);
+ HttpServletRequest inboundRequest2 =
this.getHttpServletRequest(asyncContext2);
+ expect(inboundRequest2.getCookies()).andReturn(cookies).anyTimes();
+ replay(inboundRequest2, asyncContext2, outboundResponse2,
printWriter2);
+
+ sseHaDispatch.doGet(URL, inboundRequest2, outboundResponse2);
+ latch2.await(1L, TimeUnit.SECONDS);
+
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest,
printWriter, inboundRequest2);
+ assertEquals("http://host2.valid:3333/sse",
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
+ }
+
+ @Test
+ public void testFailoverWithUserAgentLBDisabled() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ HaServiceConfig haServiceConfig =
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null,
null, "true", "true", null, null, "agentX,user1,agentY", null);
+ SSEHaDispatch sseHaDispatch = this.createDispatch(true,
haServiceConfig);
+ PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+ HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_OK);
+ AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
+ HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+ Capture<Cookie> capturedArgument = Capture.newInstance();
+
expect(inboundRequest.getHeader("User-Agent")).andReturn("agentX").anyTimes();
+ expect(inboundRequest.getRequestURL()).andReturn(new
StringBuffer(URL.toString())).once();
+
expect(inboundRequest.getAttribute("dispatch.ha.failover.counter")).andReturn(new
AtomicInteger(0)).once();
+ this.expectResponseBodyAndHeader(printWriter, outboundResponse,
capturedArgument);
+ replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+ MOCK_SSE_SERVER.expect()
+ .method("GET")
+ .pathInfo("/sse")
+ .header("Accept", "text/event-stream")
+ .respond()
+ .status(HttpStatus.SC_OK)
+
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
StandardCharsets.UTF_8)
+ .header("response", "header")
+ .contentType("text/event-stream");
+ sseHaDispatch.doGet(new URI("http://unknown-host.invalid"),
inboundRequest, outboundResponse);
+
+ latch.await(2L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest,
printWriter);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ assertEquals(URL.toString(),
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
+ }
+
+ @Test
+ public void testNoFailoverAfterResponseIsCommitted() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ HaServiceConfig haServiceConfig =
HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", null,
null, "true", "true", null, null, "agentX,user1,agentY", null);
+ SSEHaDispatch sseHaDispatch = this.createDispatch(false,
haServiceConfig);
+ PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+ HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_OK);
+ AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
+ HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+ Capture<Cookie> capturedArgument = Capture.newInstance();
+
expect(inboundRequest.getHeader("User-Agent")).andReturn("unknown").anyTimes();
+ expect(outboundResponse.isCommitted()).andReturn(true).anyTimes();
+ this.expectResponseBodyAndHeaderWithError(printWriter,
outboundResponse, capturedArgument);
+ replay(inboundRequest, asyncContext, outboundResponse, printWriter);
+
+ MOCK_SSE_SERVER.expect()
+ .method("GET")
+ .pathInfo("/sse")
+ .header("Accept", "text/event-stream")
+ .respond()
+ .status(HttpStatus.SC_OK)
+
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
StandardCharsets.UTF_8)
+ .header("response", "header")
+ .contentType("text/event-stream");
+
+ sseHaDispatch.doGet(URL, inboundRequest, outboundResponse);
+
+ latch.await(2L, TimeUnit.SECONDS);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest,
printWriter);
+ assertTrue(MOCK_SSE_SERVER.isEmpty());
+ assertEquals("http://host2.valid:3333/sse",
sseHaDispatch.getHaConfigurations().getHaProvider().getActiveURL("SSE"));
+ }
+
+ private SSEHaDispatch createDispatch(boolean failoverNeeded,
HaServiceConfig serviceConfig) throws Exception {
KeystoreService keystoreService = createMock(KeystoreService.class);
expect(keystoreService.getTruststoreForHttpClient()).andReturn(null).once();
@@ -339,7 +703,7 @@ public class SSEHaDispatchTest {
expect(filterConfig.getInitParameter("serviceRole")).andReturn(null).once();
expect(filterConfig.getInitParameter("httpclient.cookieSpec")).andReturn(null).anyTimes();
- HaProvider provider = this.createProvider(cookieName);
+ HaProvider provider = this.createProvider(failoverNeeded,
serviceConfig);
replay(keystoreService, gatewayConfig, gatewayServices,
servletContext, filterConfig);
@@ -354,8 +718,10 @@ public class SSEHaDispatchTest {
private HttpServletResponse getServletResponse(int statusCode) {
HttpServletResponse outboundResponse =
EasyMock.createNiceMock(HttpServletResponse.class);
- outboundResponse.setStatus(statusCode);
- EasyMock.expectLastCall();
+ if (statusCode != 0) {
+ outboundResponse.setStatus(statusCode);
+ EasyMock.expectLastCall();
+ }
return outboundResponse;
}
@@ -391,31 +757,54 @@ public class SSEHaDispatchTest {
return inboundRequest;
}
- private void expectResponseBodyAndHeader(PrintWriter printWriter,
HttpServletResponse outboundResponse, Capture<Cookie> capturedArgument) throws
Exception {
- outboundResponse.addCookie(capture(capturedArgument));
- EasyMock.expectLastCall();
+ private void expectResponseBodyAndHeader(PrintWriter printWriter,
HttpServletResponse outboundResponse,
+ Capture<Cookie> capturedArgument)
throws Exception {
+ if (capturedArgument != null) {
+ outboundResponse.addCookie(capture(capturedArgument));
+ EasyMock.expectLastCall();
+ }
+
+ if (printWriter != null) {
+
EasyMock.expect(outboundResponse.getWriter()).andReturn(printWriter).anyTimes();
+ printWriter.write("id:1\nevent:event1\ndata:data1");
+ EasyMock.expectLastCall();
+
printWriter.write("id:2\nevent:event2\ndata:data2\nretry:1\n:testing");
+ EasyMock.expectLastCall();
+ printWriter.println('\n');
+ EasyMock.expectLastCall().times(2);
+ }
+ }
+
+ private void expectResponseBodyAndHeaderWithError(PrintWriter printWriter,
HttpServletResponse outboundResponse,
+ Capture<Cookie> capturedArgument)
throws Exception {
+ if (capturedArgument != null) {
+ outboundResponse.addCookie(capture(capturedArgument));
+ EasyMock.expectLastCall();
+ }
+
EasyMock.expect(outboundResponse.getWriter()).andReturn(printWriter).anyTimes();
printWriter.write("id:1\nevent:event1\ndata:data1");
EasyMock.expectLastCall();
printWriter.write("id:2\nevent:event2\ndata:data2\nretry:1\n:testing");
- EasyMock.expectLastCall();
- printWriter.println('\n');
- EasyMock.expectLastCall().times(2);
+ EasyMock.expectLastCall().andThrow(new
IndexOutOfBoundsException("Custom exception"));
}
- private HaProvider createProvider(String cookieName) throws Exception {
- String serviceName = "SSE";
+ private HaProvider createProvider(boolean failoverNeeded, HaServiceConfig
serviceConfig) throws Exception {
HaDescriptor descriptor = HaDescriptorFactory.createDescriptor();
-
descriptor.addServiceConfig(HaDescriptorFactory.createServiceConfig(serviceName,
"true", null, null, null, null, "true", "true", cookieName, null,
"agentX,user1,agentY", null));
+ descriptor.addServiceConfig(serviceConfig);
HaProvider provider = new DefaultHaProvider(descriptor);
URI uri1 = new URI("http://localhost:" + MOCK_SSE_SERVER.getPort() +
"/sse");
- URI uri2 = new URI("http://host2:3333/sse");
- URI uri3 = new URI("http://host3:3333/sse");
+ URI uri2 = new URI("http://host2.valid:3333/sse");
+ URI uri3 = new URI("http://host3.valid:3333/sse");
ArrayList<String> urlList = new ArrayList<>();
+ if (failoverNeeded) {
+ urlList.add(new URI("http://unknown-host.invalid").toString());
+ }
urlList.add(uri1.toString());
urlList.add(uri2.toString());
urlList.add(uri3.toString());
provider.addHaService(serviceName, urlList);
return provider;
}
-}
\ No newline at end of file
+
+}
diff --git
a/gateway-service-webhdfs/src/main/java/org/apache/knox/gateway/hdfs/dispatch/AbstractHdfsHaDispatch.java
b/gateway-service-webhdfs/src/main/java/org/apache/knox/gateway/hdfs/dispatch/AbstractHdfsHaDispatch.java
index b284ed4cd..506217887 100644
---
a/gateway-service-webhdfs/src/main/java/org/apache/knox/gateway/hdfs/dispatch/AbstractHdfsHaDispatch.java
+++
b/gateway-service-webhdfs/src/main/java/org/apache/knox/gateway/hdfs/dispatch/AbstractHdfsHaDispatch.java
@@ -61,7 +61,7 @@ public abstract class AbstractHdfsHaDispatch extends
ConfigurableHADispatch {
writeOutboundResponse(outboundRequest, inboundRequest,
outboundResponse, inboundResponse);
} catch (StandbyException | SafeModeException | IOException e) {
/* if non-idempotent requests are not allowed to failover */
- if(!failoverNonIdempotentRequestEnabled &&
nonIdempotentRequests.stream().anyMatch(outboundRequest.getMethod()::equalsIgnoreCase))
{
+ if(!getHaConfigurations().isFailoverNonIdempotentRequestEnabled() &&
nonIdempotentRequests.stream().anyMatch(outboundRequest.getMethod()::equalsIgnoreCase))
{
LOG.cannotFailoverNonIdempotentRequest(outboundRequest.getMethod(),
e.getCause());
throw e;
} else {
@@ -103,15 +103,15 @@ public abstract class AbstractHdfsHaDispatch extends
ConfigurableHADispatch {
counter = new AtomicInteger(0);
}
inboundRequest.setAttribute(FAILOVER_COUNTER_ATTRIBUTE, counter);
- if (counter.incrementAndGet() <= maxFailoverAttempts) {
- haProvider.markFailedURL(getResourceRole(),
outboundRequest.getURI().toString());
+ if (counter.incrementAndGet() <=
getHaConfigurations().getMaxFailoverAttempts()) {
+
getHaConfigurations().getHaProvider().markFailedURL(getResourceRole(),
outboundRequest.getURI().toString());
//null out target url so that rewriters run again
inboundRequest.setAttribute(AbstractGatewayFilter.TARGET_REQUEST_URL_ATTRIBUTE_NAME,
null);
URI uri = getDispatchUrl(inboundRequest);
((HttpRequestBase) outboundRequest).setURI(uri);
- if (failoverSleep > 0) {
+ if (getHaConfigurations().getFailoverSleep() > 0) {
try {
- Thread.sleep(failoverSleep);
+ Thread.sleep(getHaConfigurations().getFailoverSleep());
} catch (InterruptedException e) {
LOG.failoverSleepFailed(getResourceRole(), e);
Thread.currentThread().interrupt();
@@ -120,7 +120,7 @@ public abstract class AbstractHdfsHaDispatch extends
ConfigurableHADispatch {
LOG.failingOverRequest(outboundRequest.getURI().toString());
executeRequest(outboundRequest, inboundRequest, outboundResponse);
} else {
- LOG.maxFailoverAttemptsReached(maxFailoverAttempts,
getResourceRole());
+
LOG.maxFailoverAttemptsReached(getHaConfigurations().getMaxFailoverAttempts(),
getResourceRole());
if (inboundResponse != null) {
writeOutboundResponse(outboundRequest, inboundRequest,
outboundResponse, inboundResponse);
} else {
diff --git a/gateway-spi/pom.xml b/gateway-spi/pom.xml
index d8f232e75..dc4c88aeb 100644
--- a/gateway-spi/pom.xml
+++ b/gateway-spi/pom.xml
@@ -58,16 +58,6 @@
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
- <exclusions>
- <exclusion>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpcore</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpcore-nio</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
diff --git
a/gateway-spi/src/main/java/org/apache/knox/gateway/SpiGatewayMessages.java
b/gateway-spi/src/main/java/org/apache/knox/gateway/SpiGatewayMessages.java
index 1037bffc4..9992602b6 100644
--- a/gateway-spi/src/main/java/org/apache/knox/gateway/SpiGatewayMessages.java
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/SpiGatewayMessages.java
@@ -144,4 +144,7 @@ public interface SpiGatewayMessages {
@Message(level = MessageLevel.INFO, text = "FIPS environment, configuring
intercepting socket")
void configureInterceptingSocket();
+
+ @Message( level = MessageLevel.ERROR, text = "Failed to send error to
client" )
+ void failedToSendErrorToClient(@StackTrace(level=MessageLevel.ERROR)
Exception e);
}
diff --git
a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/AsyncDispatch.java
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/AsyncDispatch.java
new file mode 100644
index 000000000..c855501bd
--- /dev/null
+++
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/AsyncDispatch.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.dispatch;
+
+public interface AsyncDispatch {
+}
diff --git
a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/ConfigurableDispatch.java
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/ConfigurableDispatch.java
index 6f1d240e5..d9a469783 100644
---
a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/ConfigurableDispatch.java
+++
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/ConfigurableDispatch.java
@@ -52,7 +52,7 @@ import java.util.stream.Collectors;
* make request/response exclude headers configurable
* make url encoding configurable
*/
-public class ConfigurableDispatch extends DefaultDispatch {
+public class ConfigurableDispatch extends DefaultDispatch implements
SyncDispatch{
private Set<String> requestExcludeHeaders =
super.getOutboundRequestExcludeHeaders();
private Set<String> responseExcludeHeaders =
super.getOutboundResponseExcludeHeaders();
private Map<String, String> requestAppendHeaders = Collections.emptyMap();
diff --git
a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/SyncDispatch.java
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/SyncDispatch.java
new file mode 100644
index 000000000..271ba0e85
--- /dev/null
+++
b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/SyncDispatch.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.dispatch;
+
+public interface SyncDispatch {
+}
diff --git
a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSECallback.java
b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSECallback.java
new file mode 100644
index 000000000..6a9197091
--- /dev/null
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSECallback.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.knox.gateway.SpiGatewayMessages;
+import org.apache.knox.gateway.i18n.messages.MessagesFactory;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+
+public class SSECallback implements FutureCallback<SSEResponse> {
+
+ protected static final SpiGatewayMessages LOG =
MessagesFactory.get(SpiGatewayMessages.class);
+ protected final AsyncContext asyncContext;
+ private final HttpAsyncRequestProducer producer;
+ protected final HttpServletResponse outboundResponse;
+
+
+ public SSECallback(HttpServletResponse outboundResponse, AsyncContext
asyncContext, HttpAsyncRequestProducer producer) {
+ this.outboundResponse = outboundResponse;
+ this.asyncContext = asyncContext;
+ this.producer = producer;
+ }
+
+ @Override
+ public void completed(final SSEResponse response) {
+ releaseResources(true);
+ LOG.sseConnectionDone();
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ try {
+ if(!outboundResponse.isCommitted()) {
+ outboundResponse.sendError(HttpServletResponse.SC_BAD_GATEWAY,
"Service connection error");
+ }
+ } catch (Exception e) {
+ LOG.failedToSendErrorToClient(e);
+ } finally {
+ releaseResources(true);
+ LOG.sseConnectionError(ex.getMessage());
+ }
+ }
+
+ @Override
+ public void cancelled() {
+ releaseResources(true);
+ LOG.sseConnectionCancelled();
+ }
+
+ protected void releaseResources(boolean releaseContext) {
+ try {
+ producer.close();
+ } catch (IOException e) {
+ LOG.sseProducerCloseError(e);
+ } finally {
+ if (releaseContext) {
+ asyncContext.complete();
+ }
+ }
+ }
+}
diff --git
a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java
b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java
index 767cbccef..532503df5 100644
--- a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java
@@ -25,7 +25,6 @@ import org.apache.http.client.methods.HttpPatch;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.nio.IOControl;
import org.apache.http.nio.client.HttpAsyncClient;
@@ -36,6 +35,7 @@ import org.apache.http.protocol.HttpContext;
import org.apache.knox.gateway.audit.api.Action;
import org.apache.knox.gateway.audit.api.ActionOutcome;
import org.apache.knox.gateway.audit.api.ResourceType;
+import org.apache.knox.gateway.dispatch.AsyncDispatch;
import org.apache.knox.gateway.dispatch.ConfigurableDispatch;
import org.apache.knox.gateway.dispatch.DefaultHttpAsyncClientFactory;
import org.apache.knox.gateway.dispatch.HttpAsyncClientFactory;
@@ -50,9 +50,9 @@ import java.net.URISyntaxException;
import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
-public class SSEDispatch extends ConfigurableDispatch {
+public class SSEDispatch extends ConfigurableDispatch implements AsyncDispatch
{
- private final HttpAsyncClient asyncClient;
+ protected final HttpAsyncClient asyncClient;
private static final String TEXT_EVENT_STREAM_VALUE = "text/event-stream";
public SSEDispatch(FilterConfig filterConfig) {
@@ -106,35 +106,16 @@ public class SSEDispatch extends ConfigurableDispatch {
AsyncContext asyncContext = inboundRequest.startAsync();
//No timeout
asyncContext.setTimeout(0L);
+ this.executeAsyncRequest(outboundRequest, outboundResponse,
asyncContext, inboundRequest);
+ }
+ protected void executeAsyncRequest(HttpUriRequest outboundRequest,
HttpServletResponse outboundResponse,
+ AsyncContext asyncContext,
HttpServletRequest inboundRequest) {
HttpAsyncRequestProducer producer =
HttpAsyncMethods.create(outboundRequest);
AsyncCharConsumer<SSEResponse> consumer = new
SSECharConsumer(outboundResponse, outboundRequest.getURI(), asyncContext,
inboundRequest, outboundRequest);
- this.executeAsyncRequest(producer, consumer, outboundRequest);
- }
-
- private void executeAsyncRequest(HttpAsyncRequestProducer producer,
AsyncCharConsumer<SSEResponse> consumer, HttpUriRequest outboundRequest) {
LOG.dispatchRequest(outboundRequest.getMethod(),
outboundRequest.getURI());
auditor.audit(Action.DISPATCH, outboundRequest.getURI().toString(),
ResourceType.URI, ActionOutcome.UNAVAILABLE,
RES.requestMethod(outboundRequest.getMethod()));
- asyncClient.execute(producer, consumer, new
FutureCallback<SSEResponse>() {
-
- @Override
- public void completed(final SSEResponse response) {
- closeProducer(producer);
- LOG.sseConnectionDone();
- }
-
- @Override
- public void failed(final Exception ex) {
- closeProducer(producer);
- LOG.sseConnectionError(ex.getMessage());
- }
-
- @Override
- public void cancelled() {
- closeProducer(producer);
- LOG.sseConnectionCancelled();
- }
- });
+ asyncClient.execute(producer, consumer, new
SSECallback(outboundResponse, asyncContext, producer));
}
private void addAcceptHeader(HttpUriRequest outboundRequest) {
@@ -164,15 +145,11 @@ public class SSEDispatch extends ConfigurableDispatch {
return (statusCode >= HttpStatus.SC_OK && statusCode < 300);
}
- private void closeProducer(HttpAsyncRequestProducer producer) {
- try {
- producer.close();
- } catch (IOException e) {
- LOG.sseProducerCloseError(e);
- }
+ protected void shiftCallback(HttpUriRequest outboundRequest,
HttpServletRequest inboundRequest) {
+ // No need to shift the URL for non-HA SSE requests
}
- private class SSECharConsumer extends AsyncCharConsumer<SSEResponse> {
+ protected class SSECharConsumer extends AsyncCharConsumer<SSEResponse> {
private SSEResponse sseResponse;
private final HttpServletResponse outboundResponse;
private final HttpUriRequest outboundRequest;
@@ -180,7 +157,7 @@ public class SSEDispatch extends ConfigurableDispatch {
private final URI url;
private final AsyncContext asyncContext;
- SSECharConsumer(HttpServletResponse outboundResponse, URI url,
AsyncContext asyncContext, HttpServletRequest inboundRequest, HttpUriRequest
outboundRequest) {
+ public SSECharConsumer(HttpServletResponse outboundResponse, URI url,
AsyncContext asyncContext, HttpServletRequest inboundRequest, HttpUriRequest
outboundRequest) {
this.outboundResponse = outboundResponse;
this.outboundRequest = outboundRequest;
this.inboundRequest = inboundRequest;
@@ -197,6 +174,7 @@ public class SSEDispatch extends ConfigurableDispatch {
} else {
handleErrorResponse(outboundResponse, url, inboundResponse);
}
+ shiftCallback(outboundRequest, inboundRequest);
}
@Override
@@ -211,11 +189,6 @@ public class SSEDispatch extends ConfigurableDispatch {
}
}
- @Override
- protected void releaseResources() {
- this.asyncContext.complete();
- }
-
@Override
protected SSEResponse buildResult(final HttpContext context) {
return this.sseResponse;
diff --git
a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
index e8ed44ebf..e6fac3cb4 100644
--- a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
+++ b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
@@ -386,6 +386,8 @@ public class SSEDispatchTest {
CountDownLatch latch = new CountDownLatch(1);
SSEDispatch sseDispatch = this.createDispatch();
HttpServletResponse outboundResponse =
EasyMock.createNiceMock(HttpServletResponse.class);
+ outboundResponse.sendError(HttpServletResponse.SC_BAD_GATEWAY,
"Service connection error");
+ EasyMock.expectLastCall().once();
AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
diff --git a/pom.xml b/pom.xml
index 6a1422a2c..fe1d9c136 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1597,6 +1597,16 @@
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>${asynchttpclient.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore-nio</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>