This is an automated email from the ASF dual-hosted git repository.
smolnar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git
The following commit(s) were added to refs/heads/master by this push:
new df9e7912c KNOX-3115: HA Dispatch for SSE (#1010)
df9e7912c is described below
commit df9e7912c00f6a840e51b7526c035f47fc14e777
Author: hanicz <[email protected]>
AuthorDate: Tue Apr 8 16:18:03 2025 +0200
KNOX-3115: HA Dispatch for SSE (#1010)
---
.../ha/dispatch/ConfigurableHADispatch.java | 240 +++--------
.../knox/gateway/ha/dispatch/LBHaDispatch.java | 243 +++++++++++
.../knox/gateway/ha/dispatch/SSEHaDispatch.java | 152 +++++++
.../gateway/ha/dispatch/SSEHaDispatchTest.java | 460 +++++++++------------
.../org/apache/knox/gateway/sse/SSEDispatch.java | 14 +-
.../org/apache/knox/gateway/sse/SSEEntity.java | 39 +-
.../java/org/apache/knox/gateway/sse/SSEvent.java | 86 ++--
.../apache/knox/gateway/sse/SSEDispatchTest.java | 2 +-
.../org/apache/knox/gateway/sse/SSEEntityTest.java | 25 +-
.../org/apache/knox/gateway/sse/SSEventTest.java | 18 +-
10 files changed, 751 insertions(+), 528 deletions(-)
diff --git
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/ConfigurableHADispatch.java
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/ConfigurableHADispatch.java
index 0f2bb23cb..0cb6b35b0 100644
---
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/ConfigurableHADispatch.java
+++
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/ConfigurableHADispatch.java
@@ -17,14 +17,9 @@
*/
package org.apache.knox.gateway.ha.dispatch;
-import static org.apache.knox.gateway.util.HttpUtils.isConnectionError;
-
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.client.utils.URIBuilder;
import org.apache.knox.gateway.config.Configure;
import org.apache.knox.gateway.config.GatewayConfig;
import org.apache.knox.gateway.dispatch.ConfigurableDispatch;
@@ -41,23 +36,20 @@ import javax.servlet.http.HttpServletRequestWrapper;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.net.URI;
-import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
+
+import static org.apache.knox.gateway.util.HttpUtils.isConnectionError;
/**
* A configurable HA dispatch class that has a very basic failover mechanism
and
* configurable options of ConfigurableDispatch class.
*/
-public class ConfigurableHADispatch extends ConfigurableDispatch {
+public class ConfigurableHADispatch extends ConfigurableDispatch implements
LBHaDispatch {
protected static final String FAILOVER_COUNTER_ATTRIBUTE =
"dispatch.ha.failover.counter";
@@ -69,8 +61,6 @@ public class ConfigurableHADispatch extends
ConfigurableDispatch {
protected HaProvider haProvider;
- private static final Map<String, String> urlToHashLookup = new HashMap<>();
- private static final Map<String, String> hashToUrlLookup = new HashMap<>();
protected static final List<String> nonIdempotentRequests =
Arrays.asList("POST", "PATCH", "CONNECT");
private boolean loadBalancingEnabled =
HaServiceConfigConstants.DEFAULT_LOAD_BALANCING_ENABLED;
@@ -100,40 +90,13 @@ public class ConfigurableHADispatch extends
ConfigurableDispatch {
HaServiceConfig serviceConfig =
haProvider.getHaDescriptor().getServiceConfig(getServiceRole());
maxFailoverAttempts = serviceConfig.getMaxFailoverAttempts();
failoverSleep = serviceConfig.getFailoverSleep();
- loadBalancingEnabled = serviceConfig.isLoadBalancingEnabled();
failoverNonIdempotentRequestEnabled =
serviceConfig.isFailoverNonIdempotentRequestEnabled();
-
- /* enforce dependency */
- stickySessionsEnabled = loadBalancingEnabled &&
serviceConfig.isStickySessionEnabled();
+ initializeLBHaDispatch(serviceConfig);
noFallbackEnabled = stickySessionsEnabled &&
serviceConfig.isNoFallbackEnabled();
- if(stickySessionsEnabled) {
- stickySessionCookieName = serviceConfig.getStickySessionCookieName();
- }
-
-
if(StringUtils.isNotBlank(serviceConfig.getStickySessionDisabledUserAgents())) {
- disableLoadBalancingForUserAgents =
Arrays.asList(serviceConfig.getStickySessionDisabledUserAgents()
- .trim()
- .split("\\s*,\\s*"));
- }
- setupUrlHashLookup();
- }
-
- /* setup the active URL for non-LB case */
- activeURL.set(haProvider.getActiveURL(getServiceRole()));
-
- // Suffix the cookie name by the service to make it unique
- // The cookie path is NOT unique since Knox is stripping the service name.
- stickySessionCookieName = stickySessionCookieName + '-' + getServiceRole();
- }
-
- private void setupUrlHashLookup() {
- for (String url : haProvider.getURLs(getServiceRole())) {
- String urlHash = hash(url);
- urlToHashLookup.put(url, urlHash);
- hashToUrlLookup.put(urlHash, url);
}
}
+ @Override
public HaProvider getHaProvider() {
return haProvider;
}
@@ -144,70 +107,71 @@ public class ConfigurableHADispatch extends
ConfigurableDispatch {
}
@Override
- protected void executeRequestWrapper(HttpUriRequest outboundRequest,
- HttpServletRequest inboundRequest, HttpServletResponse
outboundResponse)
- throws IOException {
+ public boolean isStickySessionEnabled() {
+ return stickySessionsEnabled;
+ }
- final String userAgentFromBrowser =
StringUtils.isBlank(inboundRequest.getHeader("User-Agent")) ? "" :
inboundRequest.getHeader("User-Agent");
+ @Override
+ public void setStickySessionsEnabled(boolean enabled) {
+ this.stickySessionsEnabled = enabled;
+ }
- /* disable loadblancing override */
- boolean userAgentDisabled = false;
+ @Override
+ public String getStickySessionCookieName() {
+ return stickySessionCookieName;
+ }
- /* disable loadbalancing in case a configured user agent is detected to
disable LB */
- if(disableLoadBalancingForUserAgents.stream().anyMatch(c ->
userAgentFromBrowser.contains(c)) ) {
- userAgentDisabled = true;
- LOG.disableHALoadbalancinguserAgent(userAgentFromBrowser,
disableLoadBalancingForUserAgents.toString());
- }
+ @Override
+ public void setStickySessionCookieName(String stickySessionCookieName) {
+ this.stickySessionCookieName = stickySessionCookieName;
+ }
- /* if disable LB is set don't bother setting backend from cookie */
- Optional<URI> backendURI = Optional.empty();
- if(!userAgentDisabled) {
- backendURI = setBackendfromHaCookie(outboundRequest, inboundRequest);
- if(backendURI.isPresent()) {
- ((HttpRequestBase) outboundRequest).setURI(backendURI.get());
- }
- }
+ @Override
+ public boolean isLoadBalancingEnabled() {
+ return loadBalancingEnabled;
+ }
- /**
- * case where loadbalancing is enabled
- * and we have a HTTP request configured not to use LB
- * use the activeURL
- */
- if(loadBalancingEnabled && userAgentDisabled) {
- try {
- ((HttpRequestBase)
outboundRequest).setURI(updateHostURL(outboundRequest.getURI(),
activeURL.get()));
- } catch (final URISyntaxException e) {
- LOG.errorSettingActiveUrl();
- }
- }
+ @Override
+ public void setLoadBalancingEnabled(boolean enabled) {
+ this.loadBalancingEnabled = enabled;
+ }
+
+ @Override
+ public List<String> getDisableLoadBalancingForUserAgents() {
+ return disableLoadBalancingForUserAgents;
+ }
+ @Override
+ public void setDisableLoadBalancingForUserAgents(List<String>
disableLoadBalancingForUserAgents) {
+ this.disableLoadBalancingForUserAgents = disableLoadBalancingForUserAgents;
+ }
+
+ @Override
+ public AtomicReference<String> getActiveURL() {
+ return activeURL;
+ }
+
+ @Override
+ public void setActiveURL(String url) {
+ activeURL.set(url);
+ }
+
+ @Override
+ protected void executeRequestWrapper(HttpUriRequest outboundRequest,
HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws
IOException {
+ boolean userAgentDisabled = isUserAgentDisabled(inboundRequest);
+ Optional<URI> backendURI = setBackendUri(outboundRequest,
inboundRequest, userAgentDisabled);
executeRequest(outboundRequest, inboundRequest, outboundResponse);
- /**
- * 1. Load balance when loadbalancing is enabled and there are no
overrides (disableLB)
- * 2. Loadbalance only when sticky session is enabled but cookie not
detected
- * i.e. when loadbalancing is enabled every request that does not
have BACKEND cookie
- * needs to be loadbalanced. If a request has BACKEND coookie and
Loadbalance=on then
- * there should be no loadbalancing.
- */
- if (loadBalancingEnabled && !userAgentDisabled) {
- /* check sticky session enabled */
- if(stickySessionsEnabled) {
- /* loadbalance only when sticky session enabled and no backend url
cookie */
- if(!backendURI.isPresent()) {
- haProvider.makeNextActiveURLAvailable(getServiceRole());
- } else{
- /* sticky session enabled and backend url cookie is valid no need
to loadbalance */
- /* do nothing */
- }
- } else {
- haProvider.makeNextActiveURLAvailable(getServiceRole());
- }
- }
+ shiftActiveURL(userAgentDisabled, backendURI);
}
@Override
protected void outboundResponseWrapper(final HttpUriRequest outboundRequest,
final HttpServletRequest inboundRequest, final HttpServletResponse
outboundResponse) {
- setKnoxHaCookie(outboundRequest, inboundRequest, outboundResponse);
+ GatewayConfig config = (GatewayConfig) inboundRequest
+ .getServletContext()
+ .getAttribute(GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE);
+ boolean sslEnabled = config != null && config.isSSLEnabled();
+
+ setKnoxHaCookie(outboundRequest, inboundRequest, outboundResponse,
sslEnabled);
}
@Override
@@ -235,71 +199,6 @@ public class ConfigurableHADispatch extends
ConfigurableDispatch {
return !failoverNonIdempotentRequestEnabled &&
nonIdempotentRequests.stream().anyMatch(outboundRequest.getMethod()::equalsIgnoreCase);
}
- private Optional<URI> setBackendfromHaCookie(HttpUriRequest outboundRequest,
HttpServletRequest inboundRequest) {
- if (loadBalancingEnabled && stickySessionsEnabled &&
inboundRequest.getCookies() != null) {
- for (Cookie cookie : inboundRequest.getCookies()) {
- if (stickySessionCookieName.equals(cookie.getName())) {
- String backendURLHash = cookie.getValue();
- String backendURL = hashToUrlLookup.get(backendURLHash);
- // Make sure that the url provided is actually a valid
backend url
- if
(haProvider.getURLs(getServiceRole()).contains(backendURL)) {
- try {
- return
Optional.of(updateHostURL(outboundRequest.getURI(), backendURL));
- } catch (URISyntaxException ignore) {
- // The cookie was invalid so we just don't set it.
Knox will pick a backend automatically
- }
- }
- }
- }
- }
- return Optional.empty();
- }
-
- private void setKnoxHaCookie(final HttpUriRequest outboundRequest, final
HttpServletRequest inboundRequest,
- final HttpServletResponse outboundResponse) {
- if (stickySessionsEnabled) {
- List<Cookie> serviceHaCookies = Collections.emptyList();
- if(inboundRequest.getCookies() != null) {
- serviceHaCookies = Arrays
- .stream(inboundRequest.getCookies())
- .filter(cookie ->
stickySessionCookieName.equals(cookie.getName()))
- .collect(Collectors.toList());
- }
- /* if the inbound request has a valid hash then no need to set a
different hash */
- if (serviceHaCookies != null && !serviceHaCookies.isEmpty()
- &&
hashToUrlLookup.containsKey(serviceHaCookies.get(0).getValue())) {
- return;
- } else {
-
- /**
- * Due to concurrency issues haProvider.getActiveURL() will not
return the accurate list
- * This will cause issues where original request goes to host-1
and cookie is set for host-2 - because
- * haProvider.getActiveURL() returned host-2. To prevent this
from happening we need to make sure
- * we set cookie for the endpoint that was served and not rely on
haProvider.getActiveURL().
- * let LBing logic take care of rotating urls.
- **/
- final List<String> urls = haProvider.getURLs(getServiceRole())
- .stream()
- .filter(u ->
u.contains(outboundRequest.getURI().getHost()))
- .collect(Collectors.toList());
-
- final String cookieValue = urlToHashLookup.get(urls.get(0));
-
- Cookie stickySessionCookie = new Cookie(stickySessionCookieName,
cookieValue);
- stickySessionCookie.setPath(inboundRequest.getContextPath());
- stickySessionCookie.setMaxAge(-1);
- stickySessionCookie.setHttpOnly(true);
- GatewayConfig config = (GatewayConfig) inboundRequest
- .getServletContext()
- .getAttribute(GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE);
- if (config != null) {
- stickySessionCookie.setSecure(config.isSSLEnabled());
- }
- outboundResponse.addCookie(stickySessionCookie);
- }
- }
- }
-
protected void failoverRequest(HttpUriRequest outboundRequest,
HttpServletRequest inboundRequest, HttpServletResponse outboundResponse,
HttpResponse inboundResponse, Exception exception) throws IOException {
// Check whether the session cookie is present
Optional<Cookie> sessionCookie = Optional.empty();
@@ -370,10 +269,6 @@ public class ConfigurableHADispatch extends
ConfigurableDispatch {
return counter;
}
- private String hash(String url) {
- return DigestUtils.sha256Hex(url);
- }
-
/**
* Strips out the cookies by the cookie name provided
*/
@@ -403,21 +298,4 @@ public class ConfigurableHADispatch extends
ConfigurableDispatch {
return cookies;
}
}
-
- /**
- * A helper function that updates the schema, host and port
- * of the URI with the provided string URL and returnes a new
- * URI object
- * @param source
- * @param host
- * @return
- */
- private URI updateHostURL(final URI source, final String host) throws
URISyntaxException {
- final URI newUri = new URI(host);
- final URIBuilder uriBuilder = new URIBuilder(source);
- uriBuilder.setScheme(newUri.getScheme());
- uriBuilder.setHost(newUri.getHost());
- uriBuilder.setPort(newUri.getPort());
- return uriBuilder.build();
- }
}
diff --git
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/LBHaDispatch.java
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/LBHaDispatch.java
new file mode 100644
index 000000000..433e9bfe5
--- /dev/null
+++
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/LBHaDispatch.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.ha.dispatch;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.knox.gateway.ha.dispatch.i18n.HaDispatchMessages;
+import org.apache.knox.gateway.ha.provider.HaProvider;
+import org.apache.knox.gateway.ha.provider.HaServiceConfig;
+import org.apache.knox.gateway.i18n.messages.MessagesFactory;
+
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+public interface LBHaDispatch {
+
+ HaDispatchMessages LOG = MessagesFactory.get(HaDispatchMessages.class);
+ Map<String, String> urlToHashLookup = new HashMap<>();
+ Map<String, String> hashToUrlLookup = new HashMap<>();
+
+ boolean isStickySessionEnabled();
+
+ void setStickySessionsEnabled(boolean enabled);
+
+ String getStickySessionCookieName();
+
+ void setStickySessionCookieName(String stickySessionCookieName);
+
+ HaProvider getHaProvider();
+
+ String getServiceRole();
+
+ void setLoadBalancingEnabled(boolean enabled);
+
+ boolean isLoadBalancingEnabled();
+
+ List<String> getDisableLoadBalancingForUserAgents();
+
+ void setDisableLoadBalancingForUserAgents(List<String>
disableLoadBalancingForUserAgents);
+
+ AtomicReference<String> getActiveURL();
+
+ void setActiveURL(String url);
+
+ default void initializeLBHaDispatch(HaServiceConfig serviceConfig) {
+ setLoadBalancingEnabled(serviceConfig.isLoadBalancingEnabled());
+ setStickySessionsEnabled(isLoadBalancingEnabled() &&
serviceConfig.isStickySessionEnabled());
+
+ if (isStickySessionEnabled()) {
+
setStickySessionCookieName(serviceConfig.getStickySessionCookieName());
+ }
+
+ if
(StringUtils.isNotBlank(serviceConfig.getStickySessionDisabledUserAgents())) {
+
setDisableLoadBalancingForUserAgents(Arrays.asList(serviceConfig.getStickySessionDisabledUserAgents()
+ .trim()
+ .split("\\s*,\\s*")));
+ }
+ setupUrlHashLookup();
+
+ /* setup the active URL for non-LB case */
+ setActiveURL(getHaProvider().getActiveURL(getServiceRole()));
+
+ // Suffix the cookie name by the service to make it unique
+ // The cookie path is NOT unique since Knox is stripping the service
name.
+ setStickySessionCookieName(getStickySessionCookieName() + '-' +
getServiceRole());
+ }
+
+ default void setKnoxHaCookie(final HttpUriRequest outboundRequest, final
HttpServletRequest inboundRequest,
+ final HttpServletResponse outboundResponse,
boolean sslEnabled) {
+ if (isStickySessionEnabled()) {
+ List<Cookie> serviceHaCookies = Collections.emptyList();
+ if (inboundRequest.getCookies() != null) {
+ serviceHaCookies = Arrays
+ .stream(inboundRequest.getCookies())
+ .filter(cookie ->
getStickySessionCookieName().equals(cookie.getName()))
+ .collect(Collectors.toList());
+ }
+
+ /* if the inbound request has a valid hash then no need to set a
different hash */
+ if (serviceHaCookies.isEmpty() ||
!hashToUrlLookup.containsKey(serviceHaCookies.get(0).getValue())) {
+ /**
+ * Due to concurrency issues haProvider.getActiveURL() will
not return the accurate list
+ * This will cause issues where original request goes to
host-1 and cookie is set for host-2 - because
+ * haProvider.getActiveURL() returned host-2. To prevent this
from happening we need to make sure
+ * we set cookie for the endpoint that was served and not rely
on haProvider.getActiveURL().
+ * let LBing logic take care of rotating urls.
+ **/
+ final List<String> urls =
getHaProvider().getURLs(getServiceRole())
+ .stream()
+ .filter(u ->
u.contains(outboundRequest.getURI().getHost()))
+ .collect(Collectors.toList());
+ final String cookieValue = urlToHashLookup.get(urls.get(0));
+ Cookie stickySessionCookie = new
Cookie(getStickySessionCookieName(), cookieValue);
+ stickySessionCookie.setPath(inboundRequest.getContextPath());
+ stickySessionCookie.setMaxAge(-1);
+ stickySessionCookie.setHttpOnly(true);
+ stickySessionCookie.setSecure(sslEnabled);
+ outboundResponse.addCookie(stickySessionCookie);
+ }
+ }
+ }
+
+ default Optional<URI> setBackendFromHaCookie(HttpUriRequest
outboundRequest, HttpServletRequest inboundRequest) {
+ if (isLoadBalancingEnabled() && isStickySessionEnabled() &&
inboundRequest.getCookies() != null) {
+ for (Cookie cookie : inboundRequest.getCookies()) {
+ if (getStickySessionCookieName().equals(cookie.getName())) {
+ String backendURLHash = cookie.getValue();
+ String backendURL = hashToUrlLookup.get(backendURLHash);
+ // Make sure that the url provided is actually a valid
backend url
+ if
(getHaProvider().getURLs(getServiceRole()).contains(backendURL)) {
+ try {
+ return
Optional.of(updateHostURL(outboundRequest.getURI(), backendURL));
+ } catch (URISyntaxException ignore) {
+ // The cookie was invalid so we just don't set it.
Knox will pick a backend automatically
+ }
+ }
+ }
+ }
+ }
+ return Optional.empty();
+ }
+
+ default String hash(String url) {
+ return DigestUtils.sha256Hex(url);
+ }
+
+ /**
+ * A helper function that updates the schema, host and port
+ * of the URI with the provided string URL and returnes a new
+ * URI object
+ *
+ * @param source
+ * @param host
+ * @return
+ */
+ default URI updateHostURL(final URI source, final String host) throws
URISyntaxException {
+ final URI newUri = new URI(host);
+ final URIBuilder uriBuilder = new URIBuilder(source);
+ uriBuilder.setScheme(newUri.getScheme());
+ uriBuilder.setHost(newUri.getHost());
+ uriBuilder.setPort(newUri.getPort());
+ return uriBuilder.build();
+ }
+
+ default void setupUrlHashLookup() {
+ for (String url : getHaProvider().getURLs(getServiceRole())) {
+ String urlHash = hash(url);
+ urlToHashLookup.put(url, urlHash);
+ hashToUrlLookup.put(urlHash, url);
+ }
+ }
+
+ default boolean isUserAgentDisabled(HttpServletRequest inboundRequest) {
+ final String userAgentFromBrowser =
StringUtils.isBlank(inboundRequest.getHeader("User-Agent")) ? "" :
inboundRequest.getHeader("User-Agent");
+ /* disable loadblancing override */
+ boolean userAgentDisabled = false;
+
+ /* disable loadbalancing in case a configured user agent is detected
to disable LB */
+ if
(getDisableLoadBalancingForUserAgents().stream().anyMatch(userAgentFromBrowser::contains))
{
+ userAgentDisabled = true;
+ LOG.disableHALoadbalancinguserAgent(userAgentFromBrowser,
getDisableLoadBalancingForUserAgents().toString());
+ }
+
+ return userAgentDisabled;
+ }
+
+ default Optional<URI> setBackendUri(HttpUriRequest outboundRequest,
HttpServletRequest inboundRequest, boolean userAgentDisabled) {
+ /* if disable LB is set don't bother setting backend from cookie */
+ Optional<URI> backendURI = Optional.empty();
+ if (!userAgentDisabled) {
+ backendURI = setBackendFromHaCookie(outboundRequest,
inboundRequest);
+ backendURI.ifPresent(uri -> ((HttpRequestBase)
outboundRequest).setURI(uri));
+ }
+
+ /**
+ * case where loadbalancing is enabled
+ * and we have a HTTP request configured not to use LB
+ * use the activeURL
+ */
+ if (isLoadBalancingEnabled() && userAgentDisabled) {
+ try {
+ ((HttpRequestBase)
outboundRequest).setURI(updateHostURL(outboundRequest.getURI(),
getActiveURL().get()));
+ } catch (final URISyntaxException e) {
+ LOG.errorSettingActiveUrl();
+ }
+ }
+
+ return backendURI;
+ }
+
+ default void shiftActiveURL(boolean userAgentDisabled, Optional<URI>
backendURI) {
+ /**
+ * 1. Load balance when loadbalancing is enabled and there are no
overrides (disableLB)
+ * 2. Loadbalance only when sticky session is enabled but cookie not
detected
+ * i.e. when loadbalancing is enabled every request that does not
have BACKEND cookie
+ * needs to be loadbalanced. If a request has BACKEND coookie and
Loadbalance=on then
+ * there should be no loadbalancing.
+ */
+ if (isLoadBalancingEnabled() && !userAgentDisabled) {
+ /* check sticky session enabled */
+ if (isStickySessionEnabled()) {
+ /* loadbalance only when sticky session enabled and no backend
url cookie */
+ if (!backendURI.isPresent()) {
+
getHaProvider().makeNextActiveURLAvailable(getServiceRole());
+ } else {
+ /* sticky session enabled and backend url cookie is valid
no need to loadbalance */
+ /* do nothing */
+ }
+ } else {
+ getHaProvider().makeNextActiveURLAvailable(getServiceRole());
+ }
+ }
+ }
+}
diff --git
a/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatch.java
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatch.java
new file mode 100644
index 000000000..50582c2b9
--- /dev/null
+++
b/gateway-provider-ha/src/main/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatch.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.ha.dispatch;
+
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.knox.gateway.config.Configure;
+import org.apache.knox.gateway.config.GatewayConfig;
+import org.apache.knox.gateway.ha.dispatch.i18n.HaDispatchMessages;
+import org.apache.knox.gateway.ha.provider.HaProvider;
+import org.apache.knox.gateway.ha.provider.HaServiceConfig;
+import org.apache.knox.gateway.ha.provider.impl.HaServiceConfigConstants;
+import org.apache.knox.gateway.i18n.messages.MessagesFactory;
+import org.apache.knox.gateway.sse.SSEDispatch;
+
+import javax.servlet.FilterConfig;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class SSEHaDispatch extends SSEDispatch implements LBHaDispatch {
+
+ protected static final HaDispatchMessages LOG =
MessagesFactory.get(HaDispatchMessages.class);
+ protected HaProvider haProvider;
+ private boolean loadBalancingEnabled =
HaServiceConfigConstants.DEFAULT_LOAD_BALANCING_ENABLED;
+ private boolean stickySessionsEnabled =
HaServiceConfigConstants.DEFAULT_STICKY_SESSIONS_ENABLED;
+ private String stickySessionCookieName =
HaServiceConfigConstants.DEFAULT_STICKY_SESSION_COOKIE_NAME;
+ private List<String> disableLoadBalancingForUserAgents =
Collections.singletonList(HaServiceConfigConstants.DEFAULT_DISABLE_LB_USER_AGENTS);
+ private final boolean sslEnabled;
+
+ /**
+ * This activeURL is used to track urls when LB is turned off for some
clients.
+ * The problem we have with selectively turning off LB is that other
clients
+ * that use LB can change the state from under the current session where
LB is
+ * turned off.
+ * e.g.
+ * ODBC Connection established where LB is off. JDBC connection is
established
+ * next where LB is enabled. This changes the active URL under the
existing ODBC
+ * connection which will be an issue.
+ * This variable keeps track of non-LB'ed url and updated upon failover.
+ */
+ private final AtomicReference<String> activeURL = new AtomicReference<>();
+
+ public SSEHaDispatch(FilterConfig filterConfig) {
+ super(filterConfig);
+
+ GatewayConfig gatewayConfig = (GatewayConfig)
filterConfig.getServletContext().getAttribute(GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE);
+ sslEnabled = gatewayConfig.isSSLEnabled();
+ }
+
+ @Override
+ public void init() {
+ super.init();
+ LOG.initializingForResourceRole(getServiceRole());
+ if (haProvider != null) {
+ HaServiceConfig serviceConfig =
haProvider.getHaDescriptor().getServiceConfig(getServiceRole());
+ this.initializeLBHaDispatch(serviceConfig);
+ }
+ }
+
+ @Override
+ public HaProvider getHaProvider() {
+ return haProvider;
+ }
+
+ @Override
+ public void setLoadBalancingEnabled(boolean enabled) {
+ this.loadBalancingEnabled = enabled;
+ }
+
+ @Configure
+ public void setHaProvider(HaProvider haProvider) {
+ this.haProvider = haProvider;
+ }
+
+ @Override
+ public boolean isStickySessionEnabled() {
+ return stickySessionsEnabled;
+ }
+
+ @Override
+ public void setStickySessionsEnabled(boolean enabled) {
+ this.stickySessionsEnabled = enabled;
+ }
+
+ @Override
+ public String getStickySessionCookieName() {
+ return stickySessionCookieName;
+ }
+
+ @Override
+ public void setStickySessionCookieName(String stickySessionCookieName) {
+ this.stickySessionCookieName = stickySessionCookieName;
+ }
+
+ @Override
+ public boolean isLoadBalancingEnabled() {
+ return loadBalancingEnabled;
+ }
+
+ @Override
+ public List<String> getDisableLoadBalancingForUserAgents() {
+ return disableLoadBalancingForUserAgents;
+ }
+
+ @Override
+ public void setDisableLoadBalancingForUserAgents(List<String>
disableLoadBalancingForUserAgents) {
+ this.disableLoadBalancingForUserAgents =
disableLoadBalancingForUserAgents;
+ }
+
+ @Override
+ public AtomicReference<String> getActiveURL() {
+ return activeURL;
+ }
+
+ @Override
+ public void setActiveURL(String url) {
+ activeURL.set(url);
+ }
+
+
+ @Override
+ protected void executeRequestWrapper(HttpUriRequest outboundRequest,
HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) {
+ boolean userAgentDisabled = isUserAgentDisabled(inboundRequest);
+ Optional<URI> backendURI = setBackendUri(outboundRequest,
inboundRequest, userAgentDisabled);
+ executeRequest(outboundRequest, inboundRequest, outboundResponse);
+ shiftActiveURL(userAgentDisabled, backendURI);
+ }
+
+ @Override
+ protected void outboundResponseWrapper(final HttpUriRequest
outboundRequest, final HttpServletRequest inboundRequest, final
HttpServletResponse outboundResponse) {
+ setKnoxHaCookie(outboundRequest, inboundRequest, outboundResponse,
sslEnabled);
+ }
+}
diff --git
a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
b/gateway-provider-ha/src/test/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatchTest.java
similarity index 55%
copy from
gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
copy to
gateway-provider-ha/src/test/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatchTest.java
index 66abcb791..77c34f559 100644
--- a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
+++
b/gateway-provider-ha/src/test/java/org/apache/knox/gateway/ha/dispatch/SSEHaDispatchTest.java
@@ -6,34 +6,40 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.knox.gateway.sse;
+package org.apache.knox.gateway.ha.dispatch;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.CookieSpecs;
-import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.knox.gateway.config.GatewayConfig;
+import org.apache.knox.gateway.ha.provider.HaDescriptor;
+import org.apache.knox.gateway.ha.provider.HaProvider;
+import org.apache.knox.gateway.ha.provider.impl.DefaultHaProvider;
+import org.apache.knox.gateway.ha.provider.impl.HaDescriptorFactory;
import org.apache.knox.gateway.services.GatewayServices;
import org.apache.knox.gateway.services.ServiceType;
import org.apache.knox.gateway.services.security.KeystoreService;
import org.apache.knox.test.mock.MockServer;
import org.apache.knox.test.mock.MockServletContext;
import org.apache.knox.test.mock.MockServletInputStream;
+import org.easymock.Capture;
import org.easymock.EasyMock;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import javax.servlet.AsyncContext;
import javax.servlet.FilterConfig;
import javax.servlet.ServletContext;
+import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.ByteArrayInputStream;
@@ -41,20 +47,21 @@ import java.io.InputStream;
import java.io.PrintWriter;
import java.net.URI;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-public class SSEDispatchTest {
+public class SSEHaDispatchTest {
private static MockServer MOCK_SSE_SERVER;
private static URI URL;
@@ -66,30 +73,76 @@ public class SSEDispatchTest {
}
@Test
- public void testCreateAndDestroyClient() throws Exception {
- SSEDispatch sseDispatch = this.createDispatch();
- assertNotNull(sseDispatch.getAsyncClient());
+ public void testHADispatchURL() throws Exception {
+ String serviceName = "SSE";
+ HaDescriptor descriptor = HaDescriptorFactory.createDescriptor();
+
descriptor.addServiceConfig(HaDescriptorFactory.createServiceConfig(serviceName,
"true", null, null, null, null, "true", "true", null, null, null, null));
+ HaProvider provider = new DefaultHaProvider(descriptor);
+ URI uri1 = new URI("http://host1.valid");
+ URI uri2 = new URI("http://host2.valid");
+ URI uri3 = new URI("http://host3.valid");
+ ArrayList<String> urlList = new ArrayList<>();
+ urlList.add(uri1.toString());
+ urlList.add(uri2.toString());
+ urlList.add(uri3.toString());
+ provider.addHaService(serviceName, urlList);
- sseDispatch.destroy();
- assertFalse(((CloseableHttpAsyncClient)
sseDispatch.getAsyncClient()).isRunning());
+ KeystoreService keystoreService = createMock(KeystoreService.class);
+
expect(keystoreService.getTruststoreForHttpClient()).andReturn(null).once();
+
+ GatewayConfig gatewayConfig = createMock(GatewayConfig.class);
+ expect(gatewayConfig.isMetricsEnabled()).andReturn(false).once();
+ expect(gatewayConfig.isSSLEnabled()).andReturn(false).once();
+
expect(gatewayConfig.getHttpClientMaxConnections()).andReturn(32).once();
+
expect(gatewayConfig.getHttpClientConnectionTimeout()).andReturn(20000).once();
+
expect(gatewayConfig.getHttpClientSocketTimeout()).andReturn(20000).once();
+
expect(gatewayConfig.getHttpClientCookieSpec()).andReturn(CookieSpecs.STANDARD).anyTimes();
+
+ GatewayServices gatewayServices = createMock(GatewayServices.class);
+
expect(gatewayServices.getService(ServiceType.KEYSTORE_SERVICE)).andReturn(keystoreService).once();
+
+ ServletContext servletContext = createMock(ServletContext.class);
+
expect(servletContext.getAttribute(GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE)).andReturn(gatewayConfig).atLeastOnce();
+
expect(servletContext.getAttribute(GatewayServices.GATEWAY_SERVICES_ATTRIBUTE)).andReturn(gatewayServices).atLeastOnce();
+
+ FilterConfig filterConfig = createMock(FilterConfig.class);
+
expect(filterConfig.getServletContext()).andReturn(servletContext).atLeastOnce();
+
expect(filterConfig.getInitParameter("useTwoWaySsl")).andReturn("false").once();
+
expect(filterConfig.getInitParameter("httpclient.maxConnections")).andReturn(null).once();
+
expect(filterConfig.getInitParameter("httpclient.connectionTimeout")).andReturn(null).once();
+
expect(filterConfig.getInitParameter("httpclient.socketTimeout")).andReturn(null).once();
+
expect(filterConfig.getInitParameter("serviceRole")).andReturn(null).once();
+
expect(filterConfig.getInitParameter("httpclient.cookieSpec")).andReturn(null).anyTimes();
+
+ HttpServletRequest inboundRequest =
EasyMock.createNiceMock(HttpServletRequest.class);
+ EasyMock.expect(inboundRequest.getRequestURL()).andReturn(new
StringBuffer(provider.getActiveURL(serviceName))).anyTimes();
+ EasyMock.replay(inboundRequest, gatewayConfig, filterConfig,
servletContext, keystoreService, gatewayServices);
+
+ SSEHaDispatch dispatch = new SSEHaDispatch(filterConfig);
+ dispatch.setHaProvider(provider);
+ dispatch.setServiceRole(serviceName);
+ dispatch.init();
+
+ /* make sure the dispatch URL is always active URL */
+ Assert.assertEquals(provider.getActiveURL(serviceName),
dispatch.getDispatchUrl(inboundRequest).toString());
}
@Test
- public void testGet2xx() throws Exception {
+ public void testStickySessionCookie() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
- SSEDispatch sseDispatch = this.createDispatch();
+ SSEHaDispatch sseHaDispatch = this.createDispatch(null);
PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_OK);
AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+ Capture<Cookie> capturedArgument = Capture.newInstance();
- this.expectResponseBodyAndHeader(printWriter, outboundResponse);
+ this.expectResponseBodyAndHeader(printWriter, outboundResponse,
capturedArgument);
replay(inboundRequest, asyncContext, outboundResponse, printWriter);
MOCK_SSE_SERVER.expect()
.method("GET")
.pathInfo("/sse")
- .header("request", "header")
.header("Accept", "text/event-stream")
.respond()
.status(HttpStatus.SC_OK)
@@ -97,322 +150,214 @@ public class SSEDispatchTest {
.header("response", "header")
.contentType("text/event-stream");
- sseDispatch.doGet(URL, inboundRequest, outboundResponse);
+ sseHaDispatch.doGet(URL, inboundRequest, outboundResponse);
latch.await(1L, TimeUnit.SECONDS);
EasyMock.verify(asyncContext, outboundResponse, inboundRequest,
printWriter);
assertTrue(MOCK_SSE_SERVER.isEmpty());
+ assertEquals("KNOX_BACKEND-SSE",
capturedArgument.getValue().getName());
}
@Test
- public void testGet4xx() throws Exception {
- CountDownLatch latch = new CountDownLatch(1);
- SSEDispatch sseDispatch = this.createDispatch();
- HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_BAD_REQUEST);
- AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
- HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
-
- replay(inboundRequest, asyncContext, outboundResponse);
-
- MOCK_SSE_SERVER.expect()
- .method("GET")
- .pathInfo("/sse")
- .respond()
- .status(HttpStatus.SC_BAD_REQUEST);
-
- sseDispatch.doGet(URL, inboundRequest, outboundResponse);
-
- latch.await(1L, TimeUnit.SECONDS);
- EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
- assertTrue(MOCK_SSE_SERVER.isEmpty());
- }
-
- @Test
- public void testGet5xx() throws Exception {
+ public void testNamedStickySessionCookie() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
- SSEDispatch sseDispatch = this.createDispatch();
- HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+ SSEHaDispatch sseHaDispatch = this.createDispatch("COOKIE_NAME");
+ PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
+ HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_OK);
AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+ Capture<Cookie> capturedArgument = Capture.newInstance();
- replay(inboundRequest, asyncContext, outboundResponse);
+ this.expectResponseBodyAndHeader(printWriter, outboundResponse,
capturedArgument);
+ replay(inboundRequest, asyncContext, outboundResponse, printWriter);
MOCK_SSE_SERVER.expect()
.method("GET")
.pathInfo("/sse")
+ .header("Accept", "text/event-stream")
.respond()
- .status(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+ .status(HttpStatus.SC_OK)
+
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
StandardCharsets.UTF_8)
+ .header("response", "header")
+ .contentType("text/event-stream");
- sseDispatch.doGet(URL, inboundRequest, outboundResponse);
+ sseHaDispatch.doGet(URL, inboundRequest, outboundResponse);
latch.await(1L, TimeUnit.SECONDS);
- EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest,
printWriter);
assertTrue(MOCK_SSE_SERVER.isEmpty());
+ assertEquals("COOKIE_NAME-SSE", capturedArgument.getValue().getName());
}
@Test
- public void testPost2xx() throws Exception {
+ public void testLoadBalancingWithoutStickySessionCookie() throws Exception
{
CountDownLatch latch = new CountDownLatch(1);
- SSEDispatch sseDispatch = this.createDispatch();
+ SSEHaDispatch sseHaDispatch = this.createDispatch(null);
PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_OK);
AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
-
- this.expectResponseBodyAndHeader(printWriter, outboundResponse);
+ Capture<Cookie> capturedArgument = Capture.newInstance();
+
expect(inboundRequest.getHeader("User-Agent")).andReturn("unknown").anyTimes();
+ this.expectResponseBodyAndHeader(printWriter, outboundResponse,
capturedArgument);
replay(inboundRequest, asyncContext, outboundResponse, printWriter);
MOCK_SSE_SERVER.expect()
- .method("POST")
+ .method("GET")
.pathInfo("/sse")
- .header("request", "header")
.header("Accept", "text/event-stream")
- .content("{\"request\":\"body\"}", StandardCharsets.UTF_8)
.respond()
.status(HttpStatus.SC_OK)
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
StandardCharsets.UTF_8)
.header("response", "header")
.contentType("text/event-stream");
- sseDispatch.doPost(URL, inboundRequest, outboundResponse);
+ sseHaDispatch.doGet(URL, inboundRequest, outboundResponse);
latch.await(1L, TimeUnit.SECONDS);
EasyMock.verify(asyncContext, outboundResponse, inboundRequest,
printWriter);
assertTrue(MOCK_SSE_SERVER.isEmpty());
+ assertEquals("http://host2:3333/sse",
sseHaDispatch.getHaProvider().getActiveURL("SSE"));
}
@Test
- public void testPost4xx() throws Exception {
+ public void testLoadBalancingDisabledWithUserAgent() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
- SSEDispatch sseDispatch = this.createDispatch();
- HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_NOT_FOUND);
- AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
- HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
-
- replay(inboundRequest, asyncContext, outboundResponse);
-
- MOCK_SSE_SERVER.expect()
- .method("POST")
- .pathInfo("/sse")
- .respond()
- .status(HttpStatus.SC_NOT_FOUND);
-
- sseDispatch.doPost(URL, inboundRequest, outboundResponse);
-
- latch.await(1L, TimeUnit.SECONDS);
- EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
- assertTrue(MOCK_SSE_SERVER.isEmpty());
- }
-
- @Test
- public void testPost5xx() throws Exception {
- CountDownLatch latch = new CountDownLatch(1);
- SSEDispatch sseDispatch = this.createDispatch();
- HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
- AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
- HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
-
- replay(inboundRequest, asyncContext, outboundResponse);
-
- MOCK_SSE_SERVER.expect()
- .method("POST")
- .pathInfo("/sse")
- .respond()
- .status(HttpStatus.SC_INTERNAL_SERVER_ERROR);
-
- sseDispatch.doPost(URL, inboundRequest, outboundResponse);
-
- latch.await(1L, TimeUnit.SECONDS);
- EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
- assertTrue(MOCK_SSE_SERVER.isEmpty());
- }
-
- @Test
- public void testPut2xx() throws Exception {
- CountDownLatch latch = new CountDownLatch(1);
- SSEDispatch sseDispatch = this.createDispatch();
+ SSEHaDispatch sseHaDispatch = this.createDispatch(null);
PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_OK);
AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+
expect(inboundRequest.getHeader("User-Agent")).andReturn("user1").anyTimes();
+ Capture<Cookie> capturedArgument = Capture.newInstance();
- this.expectResponseBodyAndHeader(printWriter, outboundResponse);
+ this.expectResponseBodyAndHeader(printWriter, outboundResponse,
capturedArgument);
replay(inboundRequest, asyncContext, outboundResponse, printWriter);
MOCK_SSE_SERVER.expect()
- .method("PUT")
+ .method("GET")
.pathInfo("/sse")
- .header("request", "header")
.header("Accept", "text/event-stream")
- .content("{\"request\":\"body\"}", StandardCharsets.UTF_8)
.respond()
.status(HttpStatus.SC_OK)
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
StandardCharsets.UTF_8)
.header("response", "header")
.contentType("text/event-stream");
- sseDispatch.doPut(URL, inboundRequest, outboundResponse);
+ sseHaDispatch.doGet(URL, inboundRequest, outboundResponse);
latch.await(1L, TimeUnit.SECONDS);
EasyMock.verify(asyncContext, outboundResponse, inboundRequest,
printWriter);
assertTrue(MOCK_SSE_SERVER.isEmpty());
+ assertEquals("http://localhost:" + MOCK_SSE_SERVER.getPort() + "/sse",
sseHaDispatch.getHaProvider().getActiveURL("SSE"));
}
@Test
- public void testPut4xx() throws Exception {
+ public void testLoadBalancingDisabledWithStickySession() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
- SSEDispatch sseDispatch = this.createDispatch();
- HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_NOT_FOUND);
- AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
- HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
-
- replay(inboundRequest, asyncContext, outboundResponse);
-
- MOCK_SSE_SERVER.expect()
- .method("PUT")
- .pathInfo("/sse")
- .respond()
- .status(HttpStatus.SC_NOT_FOUND);
-
- sseDispatch.doPut(URL, inboundRequest, outboundResponse);
-
- latch.await(1L, TimeUnit.SECONDS);
- EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
- assertTrue(MOCK_SSE_SERVER.isEmpty());
- }
-
- @Test
- public void testPut5xx() throws Exception {
- CountDownLatch latch = new CountDownLatch(1);
- SSEDispatch sseDispatch = this.createDispatch();
- HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
- AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
- HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
-
- replay(inboundRequest, asyncContext, outboundResponse);
-
- MOCK_SSE_SERVER.expect()
- .method("PUT")
- .pathInfo("/sse")
- .respond()
- .status(HttpStatus.SC_INTERNAL_SERVER_ERROR);
-
- sseDispatch.doPut(URL, inboundRequest, outboundResponse);
-
- latch.await(1L, TimeUnit.SECONDS);
- EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
- assertTrue(MOCK_SSE_SERVER.isEmpty());
- }
-
- @Test
- public void testPatch2xx() throws Exception {
- CountDownLatch latch = new CountDownLatch(1);
- SSEDispatch sseDispatch = this.createDispatch();
+ SSEHaDispatch sseHaDispatch = this.createDispatch(null);
PrintWriter printWriter = EasyMock.createNiceMock(PrintWriter.class);
HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_OK);
AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+ Capture<Cookie> capturedArgument = Capture.newInstance();
- this.expectResponseBodyAndHeader(printWriter, outboundResponse);
+ this.expectResponseBodyAndHeader(printWriter, outboundResponse,
capturedArgument);
replay(inboundRequest, asyncContext, outboundResponse, printWriter);
MOCK_SSE_SERVER.expect()
- .method("PATCH")
+ .method("GET")
.pathInfo("/sse")
- .header("request", "header")
.header("Accept", "text/event-stream")
- .content("{\"request\":\"body\"}", StandardCharsets.UTF_8)
.respond()
.status(HttpStatus.SC_OK)
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
StandardCharsets.UTF_8)
.header("response", "header")
.contentType("text/event-stream");
- sseDispatch.doPatch(URL, inboundRequest, outboundResponse);
-
+ sseHaDispatch.doGet(URL, inboundRequest, outboundResponse);
latch.await(1L, TimeUnit.SECONDS);
- EasyMock.verify(asyncContext, outboundResponse, inboundRequest,
printWriter);
- assertTrue(MOCK_SSE_SERVER.isEmpty());
- }
-
- @Test
- public void testPatch4xx() throws Exception {
- CountDownLatch latch = new CountDownLatch(1);
- SSEDispatch sseDispatch = this.createDispatch();
- HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_NOT_FOUND);
- AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
- HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
-
- replay(inboundRequest, asyncContext, outboundResponse);
+ assertEquals("http://host2:3333/sse",
sseHaDispatch.getHaProvider().getActiveURL("SSE"));
+
+
+ //Second request, sticky session cookie included
+ CountDownLatch latch2 = new CountDownLatch(1);
+ Cookie[] cookies = new Cookie[1];
+ cookies[0] = new Cookie(capturedArgument.getValue().getName(),
capturedArgument.getValue().getValue());
+ PrintWriter printWriter2 = EasyMock.createNiceMock(PrintWriter.class);
+ HttpServletResponse outboundResponse2 =
this.getServletResponse(HttpStatus.SC_OK);
+ AsyncContext asyncContext2 = this.getAsyncContext(latch2,
outboundResponse2);
+ Capture<Cookie> capturedArgument2 = Capture.newInstance();
+ this.expectResponseBodyAndHeader(printWriter2, outboundResponse2,
capturedArgument2);
+ HttpServletRequest inboundRequest2 =
this.getHttpServletRequest(asyncContext2);
+ expect(inboundRequest2.getCookies()).andReturn(cookies).anyTimes();
+ replay(inboundRequest2, asyncContext2, outboundResponse2,
printWriter2);
MOCK_SSE_SERVER.expect()
- .method("PATCH")
+ .method("GET")
.pathInfo("/sse")
+ .header("Accept", "text/event-stream")
.respond()
- .status(HttpStatus.SC_NOT_FOUND);
+ .status(HttpStatus.SC_OK)
+
.content("id:1\ndata:data1\nevent:event1\n\ndata:data2\nevent:event2\nid:2\nretry:1\n:testing\n\n",
StandardCharsets.UTF_8)
+ .header("response", "header")
+ .contentType("text/event-stream");
- sseDispatch.doPatch(URL, inboundRequest, outboundResponse);
+ sseHaDispatch.doGet(URL, inboundRequest2, outboundResponse2);
+ latch2.await(1L, TimeUnit.SECONDS);
- latch.await(1L, TimeUnit.SECONDS);
- EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+ EasyMock.verify(asyncContext, outboundResponse, inboundRequest,
printWriter, inboundRequest2);
assertTrue(MOCK_SSE_SERVER.isEmpty());
+ assertEquals("http://host2:3333/sse",
sseHaDispatch.getHaProvider().getActiveURL("SSE"));
}
- @Test
- public void testPatch5xx() throws Exception {
- CountDownLatch latch = new CountDownLatch(1);
- SSEDispatch sseDispatch = this.createDispatch();
- HttpServletResponse outboundResponse =
this.getServletResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
- AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
- HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+ private SSEHaDispatch createDispatch(String cookieName) throws Exception {
+ KeystoreService keystoreService = createMock(KeystoreService.class);
+
expect(keystoreService.getTruststoreForHttpClient()).andReturn(null).once();
- replay(inboundRequest, asyncContext, outboundResponse);
+ GatewayConfig gatewayConfig = createMock(GatewayConfig.class);
+ expect(gatewayConfig.isMetricsEnabled()).andReturn(false).once();
+ expect(gatewayConfig.isSSLEnabled()).andReturn(true).once();
+
expect(gatewayConfig.getHttpClientMaxConnections()).andReturn(32).once();
+
expect(gatewayConfig.getHttpClientConnectionTimeout()).andReturn(20000).once();
+
expect(gatewayConfig.getHttpClientSocketTimeout()).andReturn(20000).once();
+
expect(gatewayConfig.getHttpClientCookieSpec()).andReturn(CookieSpecs.STANDARD).anyTimes();
- MOCK_SSE_SERVER.expect()
- .method("PATCH")
- .pathInfo("/sse")
- .respond()
- .status(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+ GatewayServices gatewayServices = createMock(GatewayServices.class);
+
expect(gatewayServices.getService(ServiceType.KEYSTORE_SERVICE)).andReturn(keystoreService).once();
- sseDispatch.doPatch(URL, inboundRequest, outboundResponse);
+ ServletContext servletContext = createMock(ServletContext.class);
+
expect(servletContext.getAttribute(GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE)).andReturn(gatewayConfig).atLeastOnce();
+
expect(servletContext.getAttribute(GatewayServices.GATEWAY_SERVICES_ATTRIBUTE)).andReturn(gatewayServices).atLeastOnce();
- latch.await(1L, TimeUnit.SECONDS);
- EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
- assertTrue(MOCK_SSE_SERVER.isEmpty());
- }
+ FilterConfig filterConfig = createMock(FilterConfig.class);
+
expect(filterConfig.getServletContext()).andReturn(servletContext).atLeastOnce();
+
expect(filterConfig.getInitParameter("useTwoWaySsl")).andReturn("false").once();
+
expect(filterConfig.getInitParameter("httpclient.maxConnections")).andReturn(null).once();
+
expect(filterConfig.getInitParameter("httpclient.connectionTimeout")).andReturn(null).once();
+
expect(filterConfig.getInitParameter("httpclient.socketTimeout")).andReturn(null).once();
+
expect(filterConfig.getInitParameter("serviceRole")).andReturn(null).once();
+
expect(filterConfig.getInitParameter("httpclient.cookieSpec")).andReturn(null).anyTimes();
- @Test
- public void testServerNotAvailable() throws Exception {
- CountDownLatch latch = new CountDownLatch(1);
- SSEDispatch sseDispatch = this.createDispatch();
- HttpServletResponse outboundResponse =
EasyMock.createNiceMock(HttpServletResponse.class);
- AsyncContext asyncContext = this.getAsyncContext(latch,
outboundResponse);
- HttpServletRequest inboundRequest =
this.getHttpServletRequest(asyncContext);
+ HaProvider provider = this.createProvider(cookieName);
- replay(inboundRequest, asyncContext, outboundResponse);
+ replay(keystoreService, gatewayConfig, gatewayServices,
servletContext, filterConfig);
- sseDispatch.doGet(new URI("http://localhost:11223/sse"),
inboundRequest, outboundResponse);
+ SSEHaDispatch dispatch = new SSEHaDispatch(filterConfig);
+ dispatch.setHaProvider(provider);
+ dispatch.setServiceRole("SSE");
+ dispatch.init();
- latch.await(1L, TimeUnit.SECONDS);
- EasyMock.verify(asyncContext, outboundResponse, inboundRequest);
+ return dispatch;
}
- private HttpServletRequest getHttpServletRequest(AsyncContext
asyncContext) throws Exception {
- Map<String, String> headers = new HashMap<>();
- headers.put("request", "header");
- InputStream stream = new
ByteArrayInputStream("{\"request\":\"body\"}".getBytes(StandardCharsets.UTF_8));
- MockServletInputStream mockServletInputStream = new
MockServletInputStream(stream);
- HttpServletRequest inboundRequest =
EasyMock.createNiceMock(HttpServletRequest.class);
+ private HttpServletResponse getServletResponse(int statusCode) {
+ HttpServletResponse outboundResponse =
EasyMock.createNiceMock(HttpServletResponse.class);
-
EasyMock.expect(inboundRequest.getHeaderNames()).andReturn(Collections.enumeration(headers.keySet())).anyTimes();
-
EasyMock.expect(inboundRequest.startAsync()).andReturn(asyncContext).once();
-
EasyMock.expect(inboundRequest.getHeader("request")).andReturn("header").once();
-
EasyMock.expect(inboundRequest.getContentType()).andReturn("application/json").anyTimes();
-
EasyMock.expect(inboundRequest.getInputStream()).andReturn(mockServletInputStream).anyTimes();
-
EasyMock.expect(inboundRequest.getContentLength()).andReturn(mockServletInputStream.available()).anyTimes();
- EasyMock.expect(inboundRequest.getServletContext()).andReturn(new
MockServletContext()).anyTimes();
+ outboundResponse.setStatus(statusCode);
+ EasyMock.expectLastCall();
- return inboundRequest;
+ return outboundResponse;
}
private AsyncContext getAsyncContext(CountDownLatch latch,
HttpServletResponse outboundResponse) {
@@ -428,17 +373,26 @@ public class SSEDispatchTest {
return asyncContext;
}
- private HttpServletResponse getServletResponse(int statusCode) {
- HttpServletResponse outboundResponse =
EasyMock.createNiceMock(HttpServletResponse.class);
+ private HttpServletRequest getHttpServletRequest(AsyncContext
asyncContext) throws Exception {
+ Map<String, String> headers = new HashMap<>();
+ headers.put("request", "header");
+ InputStream stream = new
ByteArrayInputStream("{\"request\":\"body\"}".getBytes(StandardCharsets.UTF_8));
+ MockServletInputStream mockServletInputStream = new
MockServletInputStream(stream);
+ HttpServletRequest inboundRequest =
EasyMock.createNiceMock(HttpServletRequest.class);
- outboundResponse.setStatus(statusCode);
- EasyMock.expectLastCall();
+
EasyMock.expect(inboundRequest.getHeaderNames()).andReturn(Collections.enumeration(headers.keySet())).anyTimes();
+
EasyMock.expect(inboundRequest.startAsync()).andReturn(asyncContext).anyTimes();
+
EasyMock.expect(inboundRequest.getHeader("request")).andReturn("header").once();
+
EasyMock.expect(inboundRequest.getContentType()).andReturn("application/json").anyTimes();
+
EasyMock.expect(inboundRequest.getInputStream()).andReturn(mockServletInputStream).anyTimes();
+
EasyMock.expect(inboundRequest.getContentLength()).andReturn(mockServletInputStream.available()).anyTimes();
+ EasyMock.expect(inboundRequest.getServletContext()).andReturn(new
MockServletContext()).anyTimes();
- return outboundResponse;
+ return inboundRequest;
}
- private void expectResponseBodyAndHeader(PrintWriter printWriter,
HttpServletResponse outboundResponse) throws Exception {
- outboundResponse.addHeader("response", "header");
+ private void expectResponseBodyAndHeader(PrintWriter printWriter,
HttpServletResponse outboundResponse, Capture<Cookie> capturedArgument) throws
Exception {
+ outboundResponse.addCookie(capture(capturedArgument));
EasyMock.expectLastCall();
EasyMock.expect(outboundResponse.getWriter()).andReturn(printWriter).anyTimes();
printWriter.write("id:1\nevent:event1\ndata:data1");
@@ -449,35 +403,19 @@ public class SSEDispatchTest {
EasyMock.expectLastCall().times(2);
}
- private SSEDispatch createDispatch() throws Exception {
- KeystoreService keystoreService = createMock(KeystoreService.class);
-
expect(keystoreService.getTruststoreForHttpClient()).andReturn(null).once();
-
- GatewayConfig gatewayConfig = createMock(GatewayConfig.class);
- expect(gatewayConfig.isMetricsEnabled()).andReturn(false).once();
-
expect(gatewayConfig.getHttpClientMaxConnections()).andReturn(32).once();
-
expect(gatewayConfig.getHttpClientConnectionTimeout()).andReturn(20000).once();
-
expect(gatewayConfig.getHttpClientSocketTimeout()).andReturn(20000).once();
-
expect(gatewayConfig.getHttpClientCookieSpec()).andReturn(CookieSpecs.STANDARD).anyTimes();
-
- GatewayServices gatewayServices = createMock(GatewayServices.class);
-
expect(gatewayServices.getService(ServiceType.KEYSTORE_SERVICE)).andReturn(keystoreService).once();
-
- ServletContext servletContext = createMock(ServletContext.class);
-
expect(servletContext.getAttribute(GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE)).andReturn(gatewayConfig).atLeastOnce();
-
expect(servletContext.getAttribute(GatewayServices.GATEWAY_SERVICES_ATTRIBUTE)).andReturn(gatewayServices).atLeastOnce();
-
- FilterConfig filterConfig = createMock(FilterConfig.class);
-
expect(filterConfig.getServletContext()).andReturn(servletContext).atLeastOnce();
-
expect(filterConfig.getInitParameter("useTwoWaySsl")).andReturn("false").once();
-
expect(filterConfig.getInitParameter("httpclient.maxConnections")).andReturn(null).once();
-
expect(filterConfig.getInitParameter("httpclient.connectionTimeout")).andReturn(null).once();
-
expect(filterConfig.getInitParameter("httpclient.socketTimeout")).andReturn(null).once();
-
expect(filterConfig.getInitParameter("serviceRole")).andReturn(null).once();
-
expect(filterConfig.getInitParameter("httpclient.cookieSpec")).andReturn(null).anyTimes();
-
- replay(keystoreService, gatewayConfig, gatewayServices,
servletContext, filterConfig);
-
- return new SSEDispatch(filterConfig);
+ private HaProvider createProvider(String cookieName) throws Exception {
+ String serviceName = "SSE";
+ HaDescriptor descriptor = HaDescriptorFactory.createDescriptor();
+
descriptor.addServiceConfig(HaDescriptorFactory.createServiceConfig(serviceName,
"true", null, null, null, null, "true", "true", cookieName, null,
"agentX,user1,agentY", null));
+ HaProvider provider = new DefaultHaProvider(descriptor);
+ URI uri1 = new URI("http://localhost:" + MOCK_SSE_SERVER.getPort() +
"/sse");
+ URI uri2 = new URI("http://host2:3333/sse");
+ URI uri3 = new URI("http://host3:3333/sse");
+ ArrayList<String> urlList = new ArrayList<>();
+ urlList.add(uri1.toString());
+ urlList.add(uri2.toString());
+ urlList.add(uri3.toString());
+ provider.addHaService(serviceName, urlList);
+ return provider;
}
}
\ No newline at end of file
diff --git
a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java
b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java
index 318c4bb82..767cbccef 100644
--- a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java
@@ -98,16 +98,17 @@ public class SSEDispatch extends ConfigurableDispatch {
private void doHttpMethod(HttpUriRequest httpMethod, HttpServletRequest
inboundRequest, HttpServletResponse outboundResponse) throws IOException {
this.addAcceptHeader(httpMethod);
this.copyRequestHeaderFields(httpMethod, inboundRequest);
- this.executeRequest(httpMethod, outboundResponse, inboundRequest);
+ this.executeRequestWrapper(httpMethod, inboundRequest,
outboundResponse);
}
- private void executeRequest(HttpUriRequest outboundRequest,
HttpServletResponse outboundResponse, HttpServletRequest inboundRequest) {
+ @Override
+ protected void executeRequest(HttpUriRequest outboundRequest,
HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) {
AsyncContext asyncContext = inboundRequest.startAsync();
//No timeout
asyncContext.setTimeout(0L);
HttpAsyncRequestProducer producer =
HttpAsyncMethods.create(outboundRequest);
- AsyncCharConsumer<SSEResponse> consumer = new
SSECharConsumer(outboundResponse, outboundRequest.getURI(), asyncContext);
+ AsyncCharConsumer<SSEResponse> consumer = new
SSECharConsumer(outboundResponse, outboundRequest.getURI(), asyncContext,
inboundRequest, outboundRequest);
this.executeAsyncRequest(producer, consumer, outboundRequest);
}
@@ -174,11 +175,15 @@ public class SSEDispatch extends ConfigurableDispatch {
private class SSECharConsumer extends AsyncCharConsumer<SSEResponse> {
private SSEResponse sseResponse;
private final HttpServletResponse outboundResponse;
+ private final HttpUriRequest outboundRequest;
+ private final HttpServletRequest inboundRequest;
private final URI url;
private final AsyncContext asyncContext;
- SSECharConsumer(HttpServletResponse outboundResponse, URI url,
AsyncContext asyncContext) {
+ SSECharConsumer(HttpServletResponse outboundResponse, URI url,
AsyncContext asyncContext, HttpServletRequest inboundRequest, HttpUriRequest
outboundRequest) {
this.outboundResponse = outboundResponse;
+ this.outboundRequest = outboundRequest;
+ this.inboundRequest = inboundRequest;
this.url = url;
this.asyncContext = asyncContext;
}
@@ -187,6 +192,7 @@ public class SSEDispatch extends ConfigurableDispatch {
protected void onResponseReceived(final HttpResponse inboundResponse) {
this.sseResponse = new SSEResponse(inboundResponse);
if (isSuccessful(inboundResponse.getStatusLine().getStatusCode()))
{
+ outboundResponseWrapper(outboundRequest, inboundRequest,
outboundResponse);
handleSuccessResponse(outboundResponse, url, inboundResponse);
} else {
handleErrorResponse(outboundResponse, url, inboundResponse);
diff --git
a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEEntity.java
b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEEntity.java
index 8b598c631..8cfbda6a6 100644
--- a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEEntity.java
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEEntity.java
@@ -30,8 +30,6 @@ import java.util.concurrent.LinkedBlockingQueue;
public class SSEEntity extends AbstractHttpEntity {
- private static final String SSE_DELIMITER = ":";
-
private final BlockingQueue<SSEvent> eventQueue;
private final StringBuilder eventBuilder = new StringBuilder();
private final HttpEntity httpEntity;
@@ -68,45 +66,10 @@ public class SSEEntity extends AbstractHttpEntity {
private void processEvent() {
String unprocessedEvent = eventBuilder.toString();
- SSEvent ssEvent = new SSEvent();
-
- for (String line : unprocessedEvent.split("\\R")) {
- String[] lineTokens = this.parseLine(line);
- switch (lineTokens[0]) {
- case "id":
- ssEvent.setId(lineTokens[1].trim());
- break;
- case "event":
- ssEvent.setEvent(lineTokens[1].trim());
- break;
- case "data":
- ssEvent.setData(lineTokens[1].trim());
- break;
- case "comment":
- ssEvent.setComment(lineTokens[1].trim());
- break;
- case "retry":
- ssEvent.setRetry(Long.parseLong(lineTokens[1].trim()));
- break;
- default:
- break;
- }
- }
+ SSEvent ssEvent = SSEvent.fromString(unprocessedEvent);
eventQueue.add(ssEvent);
}
- private String[] parseLine(String line) {
- String[] lineTokens = new String[2];
- if(line.startsWith(SSE_DELIMITER)) {
- lineTokens[0] = "comment";
- lineTokens[1] = line;
- } else {
- lineTokens = line.split(SSE_DELIMITER, 2);
- }
-
- return lineTokens;
- }
-
public void sendEvent(AsyncContext asyncContext) throws IOException,
InterruptedException {
while (!eventQueue.isEmpty()) {
SSEvent event = eventQueue.take();
diff --git a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEvent.java
b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEvent.java
index dc643ed0e..98991a588 100644
--- a/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEvent.java
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEvent.java
@@ -17,16 +17,17 @@
*/
package org.apache.knox.gateway.sse;
+import java.util.Arrays;
+
public class SSEvent {
- private String data;
- private String event;
- private String id;
- private String comment;
- private Long retry;
+ private static final String SSE_DELIMITER = ":";
- public SSEvent() {
- }
+ private final String data;
+ private final String event;
+ private final String id;
+ private final String comment;
+ private final Long retry;
public SSEvent(String data, String event, String id, String comment, Long
retry) {
this.data = data;
@@ -36,26 +37,6 @@ public class SSEvent {
this.retry = retry;
}
- public void setData(String data) {
- this.data = data;
- }
-
- public void setEvent(String event) {
- this.event = event;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public void setComment(String comment) {
- this.comment = comment;
- }
-
- public void setRetry(Long retry) {
- this.retry = retry;
- }
-
public String getData() {
return this.data;
}
@@ -76,14 +57,56 @@ public class SSEvent {
return this.retry;
}
+ public static SSEvent fromString(String unprocessedEvent) {
+ String id = null;
+ String event = null;
+ String data = null;
+ String comment = null;
+ Long retry = null;
+
+ for (String line : unprocessedEvent.split("\\R")) {
+ String[] lineTokens = parseLine(line);
+ switch (lineTokens[0]) {
+ case "id":
+ id = lineTokens[1];
+ break;
+ case "event":
+ event = lineTokens[1];
+ break;
+ case "data":
+ data = lineTokens[1];
+ break;
+ case "comment":
+ comment = lineTokens[1];
+ break;
+ case "retry":
+ retry = Long.parseLong(lineTokens[1]);
+ break;
+ default:
+ break;
+ }
+ }
+ return new SSEvent(data, event, id, comment, retry);
+ }
+
+ private static String[] parseLine(String line) {
+ String[] lineTokens = Arrays.stream(line.split(SSE_DELIMITER,
2)).map(String::trim).toArray(String[]::new);
+
+ if (lineTokens[0].isEmpty()) {
+ lineTokens[0] = "comment";
+ }
+
+ return lineTokens;
+ }
+
@Override
public String toString() {
StringBuilder eventString = new StringBuilder();
- this.appendField(eventString, this.id, "id:");
- this.appendField(eventString, this.event, "event:");
- this.appendField(eventString, this.data, "data:");
- this.appendField(eventString, this.retry, "retry:");
+ this.appendField(eventString, this.id, "id");
+ this.appendField(eventString, this.event, "event");
+ this.appendField(eventString, this.data, "data");
+ this.appendField(eventString, this.retry, "retry");
this.appendField(eventString, this.comment, "");
return eventString.toString();
@@ -95,6 +118,7 @@ public class SSEvent {
eventString.append('\n');
}
eventString.append(prefix);
+ eventString.append(SSE_DELIMITER);
eventString.append(field);
}
}
diff --git
a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
index 66abcb791..e8ed44ebf 100644
--- a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
+++ b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEDispatchTest.java
@@ -480,4 +480,4 @@ public class SSEDispatchTest {
return new SSEDispatch(filterConfig);
}
-}
\ No newline at end of file
+}
diff --git
a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEEntityTest.java
b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEEntityTest.java
index 1c0375d9a..2e36b8bdb 100644
--- a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEEntityTest.java
+++ b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEEntityTest.java
@@ -50,7 +50,7 @@ public class SSEEntityTest {
assertEquals("event", actualSSEvent.getEvent());
assertEquals("data", actualSSEvent.getData());
assertEquals(1L, actualSSEvent.getRetry().longValue());
- assertEquals(":testing", actualSSEvent.getComment());
+ assertEquals("testing", actualSSEvent.getComment());
}
@Test
@@ -83,7 +83,7 @@ public class SSEEntityTest {
assertEquals("event3", actualSSEvent.getEvent());
assertEquals("data3", actualSSEvent.getData());
assertEquals(1045L, actualSSEvent.getRetry().longValue());
- assertEquals(":TEST", actualSSEvent.getComment());
+ assertEquals("TEST", actualSSEvent.getComment());
}
@Test
@@ -114,7 +114,7 @@ public class SSEEntityTest {
assertNull(actualSSEvent.getEvent());
assertEquals("data:{\"records\":[{\"col_str\":\"0e01eeef73f6833a98e1df6a5a00ea46f5b52dbee27ee89ebce894aaa555c90130b08fae8aaf600ef845b774ab0082fcaf8c\",\"col_int\":-580163093,\"col_ts\":\"2024-08-14T07:41:15.125\"}],\"job_status\":\"RUNNING\",\"end_of_samples\":false}",
actualSSEvent.getData());
assertEquals(33L, actualSSEvent.getRetry().longValue());
- assertEquals("::::test", actualSSEvent.getComment());
+ assertEquals(":::test", actualSSEvent.getComment());
}
@Test
@@ -160,4 +160,23 @@ public class SSEEntityTest {
assertEquals("event4", actualSSEvent.getEvent());
assertEquals("data4", actualSSEvent.getData());
}
+
+ @Test
+ public void testParseSingleEventTrim() {
+ SSEEntity sseEntity = new SSEEntity(entityMock);
+ BlockingQueue<SSEvent> eventQueue = sseEntity.getEventQueue();
+ String unprocessedEvent = " id : 1 \n event : event \n data : data \n
retry : 1 \n : testing \n\n";
+ CharBuffer cb = CharBuffer.wrap(unprocessedEvent);
+
+ sseEntity.readCharBuffer(cb);
+
+ assertFalse(eventQueue.isEmpty());
+
+ SSEvent actualSSEvent = eventQueue.peek();
+ assertEquals("1", actualSSEvent.getId());
+ assertEquals("event", actualSSEvent.getEvent());
+ assertEquals("data", actualSSEvent.getData());
+ assertEquals(1L, actualSSEvent.getRetry().longValue());
+ assertEquals("testing", actualSSEvent.getComment());
+ }
}
diff --git
a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEventTest.java
b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEventTest.java
index 378726ab3..6921cb559 100644
--- a/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEventTest.java
+++ b/gateway-spi/src/test/java/org/apache/knox/gateway/sse/SSEventTest.java
@@ -26,7 +26,7 @@ public class SSEventTest {
@Test
public void testToStringWithAll() {
- SSEvent ssEvent = new SSEvent("data", "event", "id", ":comment", 5L);
+ SSEvent ssEvent = new SSEvent("data", "event", "id", "comment", 5L);
String expected = "id:id\nevent:event\ndata:data\nretry:5\n:comment";
assertEquals(expected, ssEvent.toString());
@@ -34,8 +34,8 @@ public class SSEventTest {
@Test
public void testToStringNoId() {
- SSEvent ssEventNull = new SSEvent("data", "event", null, ":test", 2L);
- SSEvent ssEventEmpty = new SSEvent("data", "event", "", ":new
comment", 1L);
+ SSEvent ssEventNull = new SSEvent("data", "event", null, "test", 2L);
+ SSEvent ssEventEmpty = new SSEvent("data", "event", "", "new comment",
1L);
String expectedForNull = "event:event\ndata:data\nretry:2\n:test";
String expectedForEmpty = "id:\nevent:event\ndata:data\nretry:1\n:new
comment";
@@ -45,8 +45,8 @@ public class SSEventTest {
@Test
public void testToStringNoEvent() {
- SSEvent ssEventNull = new SSEvent("data", null, "id", ":comment", 11L);
- SSEvent ssEventEmpty = new SSEvent("data", "", "id", ":test comment",
30L);
+ SSEvent ssEventNull = new SSEvent("data", null, "id", "comment", 11L);
+ SSEvent ssEventEmpty = new SSEvent("data", "", "id", "test comment",
30L);
String expectedForNull = "id:id\ndata:data\nretry:11\n:comment";
String expectedForEmpty = "id:id\nevent:\ndata:data\nretry:30\n:test
comment";
@@ -56,8 +56,8 @@ public class SSEventTest {
@Test
public void testToStringNoData() {
- SSEvent ssEventNull = new SSEvent(null, "event", "id", ":comment", 2L);
- SSEvent ssEventEmpty = new SSEvent("", "event", "id", ":testing", 1L);
+ SSEvent ssEventNull = new SSEvent(null, "event", "id", "comment", 2L);
+ SSEvent ssEventEmpty = new SSEvent("", "event", "id", "testing", 1L);
String expectedForNull = "id:id\nevent:event\nretry:2\n:comment";
String expectedForEmpty =
"id:id\nevent:event\ndata:\nretry:1\n:testing";
@@ -75,9 +75,9 @@ public class SSEventTest {
@Test
public void testToStringNoRetry() {
- SSEvent ssEventNull = new SSEvent("data", "event", "id", ":TEST",
null);
+ SSEvent ssEventNull = new SSEvent("data", "event", "id", "TEST", null);
String expected = "id:id\nevent:event\ndata:data\n:TEST";
assertEquals(expected, ssEventNull.toString());
}
-}
\ No newline at end of file
+}