Repository: knox Updated Branches: refs/heads/master 1bcfe47aa -> 0f207409c
KNOX-936 - On websocket error properly close all the sessions and containers. Project: http://git-wip-us.apache.org/repos/asf/knox/repo Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/0f207409 Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/0f207409 Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/0f207409 Branch: refs/heads/master Commit: 0f207409c02fff54e3fb3a5a1c0b3865ef91e7db Parents: 1bcfe47 Author: Sandeep More <[email protected]> Authored: Tue May 9 09:50:58 2017 -0400 Committer: Sandeep More <[email protected]> Committed: Tue May 9 09:50:58 2017 -0400 ---------------------------------------------------------------------- .../websockets/GatewayWebsocketHandler.java | 16 +++++- .../gateway/websockets/ProxyInboundSocket.java | 2 - .../websockets/ProxyWebSocketAdapter.java | 60 +++++++++++++++----- .../gateway/websockets/BadBackendTest.java | 3 +- .../hadoop/gateway/websockets/BadUrlTest.java | 2 + .../websockets/ConnectionDroppedTest.java | 3 +- .../gateway/websockets/MessageFailureTest.java | 3 +- 7 files changed, 68 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/knox/blob/0f207409/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java index 8faa4f4..a0c7f5f 100644 --- a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java +++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java @@ -22,6 +22,8 @@ import java.net.MalformedURLException; import java.net.URI; import java.net.URL; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.gateway.config.GatewayConfig; @@ -57,6 +59,14 @@ public class GatewayWebsocketHandler extends WebSocketHandler static final String REGEX_SPLIT_CONTEXT = "^((?:[^/]*/){2}[^/]*)"; + private static final int POOL_SIZE = 10; + + /** + * Manage the threads that are spawned + * @since 0.13 + */ + private final ExecutorService pool; + final GatewayConfig config; final GatewayServices services; @@ -72,6 +82,7 @@ public class GatewayWebsocketHandler extends WebSocketHandler this.config = config; this.services = services; + pool = Executors.newFixedThreadPool(POOL_SIZE); } @@ -124,7 +135,7 @@ public class GatewayWebsocketHandler extends WebSocketHandler final String backendURL = getMatchedBackendURL(path); /* Upgrade happens here */ - return new ProxyWebSocketAdapter(URI.create(backendURL)); + return new ProxyWebSocketAdapter(URI.create(backendURL), pool); } catch (final Exception e) { LOG.failedCreatingWebSocket(e); throw e; @@ -136,8 +147,7 @@ public class GatewayWebsocketHandler extends WebSocketHandler * url. If websocket url is found it is used as is, or we default to * ws://{host}:{port} which might or might not be right. * - * @param The - * context path + * @param The context path * @return Websocket backend url */ private synchronized String getMatchedBackendURL(final String path) { http://git-wip-us.apache.org/repos/asf/knox/blob/0f207409/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundSocket.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundSocket.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundSocket.java index 3c0edaf..6de3f9c 100644 --- a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundSocket.java +++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundSocket.java @@ -59,14 +59,12 @@ public class ProxyInboundSocket { @OnError public void onClientError(Throwable cause) { - cause.printStackTrace(System.err); callback.onError(cause); } @OnMessage(maxMessageSize = Integer.MAX_VALUE) public void onBackendMessage(final String message, final javax.websocket.Session session) { - callback.onMessageText(message, session); } http://git-wip-us.apache.org/repos/asf/knox/blob/0f207409/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java index 629f63d..1e7f583 100644 --- a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java +++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java @@ -19,6 +19,8 @@ package org.apache.hadoop.gateway.websockets; import java.io.IOException; import java.net.URI; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import javax.websocket.CloseReason; import javax.websocket.ContainerProvider; @@ -55,12 +57,15 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter { private WebSocketContainer container; + private ExecutorService pool; + /** * Create an instance */ - public ProxyWebSocketAdapter(URI backend) { + public ProxyWebSocketAdapter(final URI backend, final ExecutorService pool) { super(); this.backend = backend; + this.pool = pool; } @Override @@ -129,7 +134,13 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter { public void onWebSocketClose(int statusCode, String reason) { super.onWebSocketClose(statusCode, reason); - closeQuietly(); + /* do the cleaning business in seperate thread so we don't block */ + pool.execute(new Runnable() { + @Override + public void run() { + closeQuietly(); + } + }); LOG.onConnectionClose(backend.toString()); @@ -137,23 +148,34 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter { @Override public void onWebSocketError(final Throwable t) { - passErrorToInboundConnection(t); + cleanupOnError(t); } /** - * A helper function to pass errors to Inbound connection (browser/client) + * Cleanup sessions */ - private void passErrorToInboundConnection(final Throwable t) { + private void cleanupOnError(final Throwable t) { LOG.onError(t.toString()); if (t.toString().contains("exceeds maximum size")) { - frontendSession.close(StatusCode.MESSAGE_TOO_LARGE, t.getMessage()); + if(frontendSession != null && !frontendSession.isOpen()) { + frontendSession.close(StatusCode.MESSAGE_TOO_LARGE, t.getMessage()); + } } else { - frontendSession.close(StatusCode.SERVER_ERROR, t.getMessage()); - closeQuietly(); - throw new RuntimeException(t); + if(frontendSession != null && !frontendSession.isOpen()) { + frontendSession.close(StatusCode.SERVER_ERROR, t.getMessage()); + } + + /* do the cleaning business in seperate thread so we don't block */ + pool.execute(new Runnable() { + @Override + public void run() { + closeQuietly(); + } + }); + } } @@ -179,14 +201,22 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter { frontendSession.close(reason.getCloseCode().getCode(), reason.getReasonPhrase()); } finally { - closeQuietly(); + + /* do the cleaning business in seperate thread so we don't block */ + pool.execute(new Runnable() { + @Override + public void run() { + closeQuietly(); + } + }); + } } @Override public void onError(Throwable cause) { - passErrorToInboundConnection(cause); + cleanupOnError(cause); } @Override @@ -223,7 +253,9 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter { private void closeQuietly() { try { - backendSession.close(); + if(backendSession != null && !backendSession.isOpen()) { + backendSession.close(); + } } catch (IOException e) { LOG.connectionFailed(e); } @@ -236,7 +268,9 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter { } } - frontendSession.close(); + if(frontendSession != null && !frontendSession.isOpen()) { + frontendSession.close(); + } } http://git-wip-us.apache.org/repos/asf/knox/blob/0f207409/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadBackendTest.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadBackendTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadBackendTest.java index 1e4b86f..88d4f5b 100644 --- a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadBackendTest.java +++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadBackendTest.java @@ -19,6 +19,7 @@ package org.apache.hadoop.gateway.websockets; import java.io.IOException; import java.net.URI; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import javax.websocket.CloseReason; @@ -94,7 +95,7 @@ public class BadBackendTest { /* start Knox with WebsocketAdapter to test */ final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler( - new ProxyWebSocketAdapter(new URI(BAD_BACKEND))); + new ProxyWebSocketAdapter(new URI(BAD_BACKEND), Executors.newFixedThreadPool(10))); ContextHandler context = new ContextHandler(); context.setContextPath("/"); http://git-wip-us.apache.org/repos/asf/knox/blob/0f207409/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadUrlTest.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadUrlTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadUrlTest.java index 4b9f836..135b385 100644 --- a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadUrlTest.java +++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadUrlTest.java @@ -119,6 +119,7 @@ public class BadUrlTest { * * @throws Exception */ + @Test public void testBadUrl() throws Exception { WebSocketContainer container = ContainerProvider.getWebSocketContainer(); @@ -136,6 +137,7 @@ public class BadUrlTest { } + /** * Start Gateway Server. * http://git-wip-us.apache.org/repos/asf/knox/blob/0f207409/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ConnectionDroppedTest.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ConnectionDroppedTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ConnectionDroppedTest.java index 6562e5c..3c3f40d 100644 --- a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ConnectionDroppedTest.java +++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ConnectionDroppedTest.java @@ -19,6 +19,7 @@ package org.apache.hadoop.gateway.websockets; import java.io.IOException; import java.net.URI; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import javax.websocket.CloseReason; @@ -135,7 +136,7 @@ public class ConnectionDroppedTest { /* start Knox with WebsocketAdapter to test */ final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler( - new ProxyWebSocketAdapter(serverUri)); + new ProxyWebSocketAdapter(serverUri, Executors.newFixedThreadPool(10))); ContextHandler context = new ContextHandler(); context.setContextPath("/"); http://git-wip-us.apache.org/repos/asf/knox/blob/0f207409/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/MessageFailureTest.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/MessageFailureTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/MessageFailureTest.java index f98b7e1..bbb6f13 100644 --- a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/MessageFailureTest.java +++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/MessageFailureTest.java @@ -19,6 +19,7 @@ package org.apache.hadoop.gateway.websockets; import java.io.IOException; import java.net.URI; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import javax.websocket.CloseReason; @@ -155,7 +156,7 @@ public class MessageFailureTest { /* start Knox with WebsocketAdapter to test */ final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler( - new ProxyWebSocketAdapter(serverUri)); + new ProxyWebSocketAdapter(serverUri, Executors.newFixedThreadPool(10))); ContextHandler context = new ContextHandler(); context.setContextPath("/");
