http://git-wip-us.apache.org/repos/asf/knox/blob/7d0bff16/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyInboundClient.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyInboundClient.java b/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyInboundClient.java new file mode 100644 index 0000000..c12ee53 --- /dev/null +++ b/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyInboundClient.java @@ -0,0 +1,107 @@ +package org.apache.knox.gateway.websockets; + +import javax.websocket.CloseReason; +import javax.websocket.Endpoint; +import javax.websocket.EndpointConfig; +import javax.websocket.MessageHandler; +import javax.websocket.Session; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +/** + * A Websocket client with callback which is not annotation based. + * This handler accepts String and binary messages. + * @since 0.14.0 + */ +public class ProxyInboundClient extends Endpoint { + + /** + * Callback to be called once we have events on our socket. + */ + private MessageEventCallback callback; + + protected Session session; + protected EndpointConfig config; + + + public ProxyInboundClient(final MessageEventCallback callback) { + super(); + this.callback = callback; + } + + /** + * Developers must implement this method to be notified when a new + * conversation has just begun. + * + * @param backendSession the session that has just been activated. + * @param config the configuration used to configure this endpoint. + */ + @Override + public void onOpen(final javax.websocket.Session backendSession, final EndpointConfig config) { + this.session = backendSession; + this.config = config; + + /* Set the max message size */ + session.setMaxBinaryMessageBufferSize(Integer.MAX_VALUE); + session.setMaxTextMessageBufferSize(Integer.MAX_VALUE); + + /* Add message handler for binary data */ + session.addMessageHandler(new MessageHandler.Whole<byte[]>() { + + /** + * Called when the message has been fully received. + * + * @param message the message data. + */ + @Override + public void onMessage(final byte[] message) { + callback.onMessageBinary(message, true, session); + } + + }); + + /* Add message handler for text data */ + session.addMessageHandler(new MessageHandler.Whole<String>() { + + /** + * Called when the message has been fully received. + * + * @param message the message data. + */ + @Override + public void onMessage(final String message) { + callback.onMessageText(message, session); + } + + }); + + callback.onConnectionOpen(backendSession); + } + + @Override + public void onClose(final javax.websocket.Session backendSession, final CloseReason closeReason) { + callback.onConnectionClose(closeReason); + this.session = null; + } + + @Override + public void onError(final javax.websocket.Session backendSession, final Throwable cause) { + callback.onError(cause); + this.session = null; + } + +}
http://git-wip-us.apache.org/repos/asf/knox/blob/7d0bff16/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ProxyInboundClientTest.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ProxyInboundClientTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ProxyInboundClientTest.java deleted file mode 100644 index 69b45dd..0000000 --- a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ProxyInboundClientTest.java +++ /dev/null @@ -1,374 +0,0 @@ -package org.apache.hadoop.gateway.websockets; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -import org.apache.commons.lang.RandomStringUtils; -import org.eclipse.jetty.server.Handler; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.ServerConnector; -import org.eclipse.jetty.server.handler.ContextHandler; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import javax.websocket.CloseReason; -import javax.websocket.ContainerProvider; -import javax.websocket.DeploymentException; -import javax.websocket.Session; -import javax.websocket.WebSocketContainer; -import java.io.IOException; -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.hamcrest.Matchers.instanceOf; - -/** - * Test {@link ProxyInboundClient} class. - * @since 0.14.0 - */ -public class ProxyInboundClientTest { - - private static Server server; - private static URI serverUri; - private static Handler handler; - - String recievedMessage = null; - - byte[] recievedBinaryMessage = null; - - - /* create an instance */ - public ProxyInboundClientTest() { - super(); - } - - @BeforeClass - public static void startWSServer() throws Exception - { - server = new Server(); - ServerConnector connector = new ServerConnector(server); - server.addConnector(connector); - - handler = new WebsocketEchoHandler(); - - ContextHandler context = new ContextHandler(); - context.setContextPath("/"); - context.setHandler(handler); - server.setHandler(context); - - server.start(); - - String host = connector.getHost(); - if (host == null) - { - host = "localhost"; - } - int port = connector.getLocalPort(); - serverUri = new URI(String.format("ws://%s:%d/",host,port)); - } - - @AfterClass - public static void stopServer() - { - try - { - server.stop(); - } - catch (Exception e) - { - e.printStackTrace(System.err); - } - } - - //@Test(timeout = 3000) - @Test - public void testClientInstance() throws IOException, DeploymentException { - - final String textMessage = "Echo"; - final ByteBuffer binarymessage = ByteBuffer.wrap(textMessage.getBytes()); - - final AtomicBoolean isTestComplete = new AtomicBoolean(false); - - final WebSocketContainer container = ContainerProvider.getWebSocketContainer(); - final ProxyInboundClient client = new ProxyInboundClient( new MessageEventCallback() { - - /** - * A generic callback, can be left un-implemented - * - * @param message - */ - @Override - public void doCallback(String message) { - - } - - /** - * Callback when connection is established. - * - * @param session - */ - @Override - public void onConnectionOpen(Object session) { - - } - - /** - * Callback when connection is closed. - * - * @param reason - */ - @Override - public void onConnectionClose(CloseReason reason) { - isTestComplete.set(true); - } - - /** - * Callback when there is an error in connection. - * - * @param cause - */ - @Override - public void onError(Throwable cause) { - isTestComplete.set(true); - } - - /** - * Callback when a text message is received. - * - * @param message - * @param session - */ - @Override - public void onMessageText(String message, Object session) { - recievedMessage = message; - isTestComplete.set(true); - } - - /** - * Callback when a binary message is received. - * - * @param message - * @param last - * @param session - */ - @Override - public void onMessageBinary(byte[] message, boolean last, - Object session) { - - } - } ); - - Assert.assertThat(client, instanceOf(javax.websocket.Endpoint.class)); - - Session session = container.connectToServer(client, serverUri); - - session.getBasicRemote().sendText(textMessage); - - while(!isTestComplete.get()) { - /* just wait for the test to finish */ - } - - Assert.assertEquals("The received text message is not the same as the sent", textMessage, recievedMessage); - } - - @Test(timeout = 3000) - public void testBinarymessage() throws IOException, DeploymentException { - - final String textMessage = "Echo"; - final ByteBuffer binarymessage = ByteBuffer.wrap(textMessage.getBytes()); - - final AtomicBoolean isTestComplete = new AtomicBoolean(false); - - final WebSocketContainer container = ContainerProvider.getWebSocketContainer(); - final ProxyInboundClient client = new ProxyInboundClient( new MessageEventCallback() { - - /** - * A generic callback, can be left un-implemented - * - * @param message - */ - @Override - public void doCallback(String message) { - - } - - /** - * Callback when connection is established. - * - * @param session - */ - @Override - public void onConnectionOpen(Object session) { - - } - - /** - * Callback when connection is closed. - * - * @param reason - */ - @Override - public void onConnectionClose(CloseReason reason) { - isTestComplete.set(true); - } - - /** - * Callback when there is an error in connection. - * - * @param cause - */ - @Override - public void onError(Throwable cause) { - isTestComplete.set(true); - } - - /** - * Callback when a text message is received. - * - * @param message - * @param session - */ - @Override - public void onMessageText(String message, Object session) { - recievedMessage = message; - isTestComplete.set(true); - } - - /** - * Callback when a binary message is received. - * - * @param message - * @param last - * @param session - */ - @Override - public void onMessageBinary(byte[] message, boolean last, - Object session) { - recievedBinaryMessage = message; - isTestComplete.set(true); - } - } ); - - Assert.assertThat(client, instanceOf(javax.websocket.Endpoint.class)); - - Session session = container.connectToServer(client, serverUri); - - session.getBasicRemote().sendBinary(binarymessage); - - while(!isTestComplete.get()) { - /* just wait for the test to finish */ - } - - Assert.assertEquals("Binary message does not match", textMessage, new String(recievedBinaryMessage)); - } - - @Test(timeout = 3000) - public void testTextMaxBufferLimit() throws IOException, DeploymentException { - - final String longMessage = RandomStringUtils.random(100000); - - final AtomicBoolean isTestComplete = new AtomicBoolean(false); - - final WebSocketContainer container = ContainerProvider.getWebSocketContainer(); - final ProxyInboundClient client = new ProxyInboundClient( new MessageEventCallback() { - - /** - * A generic callback, can be left un-implemented - * - * @param message - */ - @Override - public void doCallback(String message) { - - } - - /** - * Callback when connection is established. - * - * @param session - */ - @Override - public void onConnectionOpen(Object session) { - - } - - /** - * Callback when connection is closed. - * - * @param reason - */ - @Override - public void onConnectionClose(CloseReason reason) { - isTestComplete.set(true); - } - - /** - * Callback when there is an error in connection. - * - * @param cause - */ - @Override - public void onError(Throwable cause) { - isTestComplete.set(true); - } - - /** - * Callback when a text message is received. - * - * @param message - * @param session - */ - @Override - public void onMessageText(String message, Object session) { - recievedMessage = message; - isTestComplete.set(true); - } - - /** - * Callback when a binary message is received. - * - * @param message - * @param last - * @param session - */ - @Override - public void onMessageBinary(byte[] message, boolean last, - Object session) { - - } - } ); - - Assert.assertThat(client, instanceOf(javax.websocket.Endpoint.class)); - - Session session = container.connectToServer(client, serverUri); - - session.getBasicRemote().sendText(longMessage); - - while(!isTestComplete.get()) { - /* just wait for the test to finish */ - } - - Assert.assertEquals(longMessage, recievedMessage); - - } - - - -} http://git-wip-us.apache.org/repos/asf/knox/blob/7d0bff16/gateway-server/src/test/java/org/apache/knox/gateway/websockets/ProxyInboundClientTest.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/ProxyInboundClientTest.java b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/ProxyInboundClientTest.java new file mode 100644 index 0000000..f8dd167 --- /dev/null +++ b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/ProxyInboundClientTest.java @@ -0,0 +1,374 @@ +package org.apache.knox.gateway.websockets; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +import org.apache.commons.lang.RandomStringUtils; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.ContextHandler; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import javax.websocket.CloseReason; +import javax.websocket.ContainerProvider; +import javax.websocket.DeploymentException; +import javax.websocket.Session; +import javax.websocket.WebSocketContainer; +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.Matchers.instanceOf; + +/** + * Test {@link ProxyInboundClient} class. + * @since 0.14.0 + */ +public class ProxyInboundClientTest { + + private static Server server; + private static URI serverUri; + private static Handler handler; + + String recievedMessage = null; + + byte[] recievedBinaryMessage = null; + + + /* create an instance */ + public ProxyInboundClientTest() { + super(); + } + + @BeforeClass + public static void startWSServer() throws Exception + { + server = new Server(); + ServerConnector connector = new ServerConnector(server); + server.addConnector(connector); + + handler = new WebsocketEchoHandler(); + + ContextHandler context = new ContextHandler(); + context.setContextPath("/"); + context.setHandler(handler); + server.setHandler(context); + + server.start(); + + String host = connector.getHost(); + if (host == null) + { + host = "localhost"; + } + int port = connector.getLocalPort(); + serverUri = new URI(String.format("ws://%s:%d/",host,port)); + } + + @AfterClass + public static void stopServer() + { + try + { + server.stop(); + } + catch (Exception e) + { + e.printStackTrace(System.err); + } + } + + //@Test(timeout = 3000) + @Test + public void testClientInstance() throws IOException, DeploymentException { + + final String textMessage = "Echo"; + final ByteBuffer binarymessage = ByteBuffer.wrap(textMessage.getBytes()); + + final AtomicBoolean isTestComplete = new AtomicBoolean(false); + + final WebSocketContainer container = ContainerProvider.getWebSocketContainer(); + final ProxyInboundClient client = new ProxyInboundClient( new MessageEventCallback() { + + /** + * A generic callback, can be left un-implemented + * + * @param message + */ + @Override + public void doCallback(String message) { + + } + + /** + * Callback when connection is established. + * + * @param session + */ + @Override + public void onConnectionOpen(Object session) { + + } + + /** + * Callback when connection is closed. + * + * @param reason + */ + @Override + public void onConnectionClose(CloseReason reason) { + isTestComplete.set(true); + } + + /** + * Callback when there is an error in connection. + * + * @param cause + */ + @Override + public void onError(Throwable cause) { + isTestComplete.set(true); + } + + /** + * Callback when a text message is received. + * + * @param message + * @param session + */ + @Override + public void onMessageText(String message, Object session) { + recievedMessage = message; + isTestComplete.set(true); + } + + /** + * Callback when a binary message is received. + * + * @param message + * @param last + * @param session + */ + @Override + public void onMessageBinary(byte[] message, boolean last, + Object session) { + + } + } ); + + Assert.assertThat(client, instanceOf(javax.websocket.Endpoint.class)); + + Session session = container.connectToServer(client, serverUri); + + session.getBasicRemote().sendText(textMessage); + + while(!isTestComplete.get()) { + /* just wait for the test to finish */ + } + + Assert.assertEquals("The received text message is not the same as the sent", textMessage, recievedMessage); + } + + @Test(timeout = 3000) + public void testBinarymessage() throws IOException, DeploymentException { + + final String textMessage = "Echo"; + final ByteBuffer binarymessage = ByteBuffer.wrap(textMessage.getBytes()); + + final AtomicBoolean isTestComplete = new AtomicBoolean(false); + + final WebSocketContainer container = ContainerProvider.getWebSocketContainer(); + final ProxyInboundClient client = new ProxyInboundClient( new MessageEventCallback() { + + /** + * A generic callback, can be left un-implemented + * + * @param message + */ + @Override + public void doCallback(String message) { + + } + + /** + * Callback when connection is established. + * + * @param session + */ + @Override + public void onConnectionOpen(Object session) { + + } + + /** + * Callback when connection is closed. + * + * @param reason + */ + @Override + public void onConnectionClose(CloseReason reason) { + isTestComplete.set(true); + } + + /** + * Callback when there is an error in connection. + * + * @param cause + */ + @Override + public void onError(Throwable cause) { + isTestComplete.set(true); + } + + /** + * Callback when a text message is received. + * + * @param message + * @param session + */ + @Override + public void onMessageText(String message, Object session) { + recievedMessage = message; + isTestComplete.set(true); + } + + /** + * Callback when a binary message is received. + * + * @param message + * @param last + * @param session + */ + @Override + public void onMessageBinary(byte[] message, boolean last, + Object session) { + recievedBinaryMessage = message; + isTestComplete.set(true); + } + } ); + + Assert.assertThat(client, instanceOf(javax.websocket.Endpoint.class)); + + Session session = container.connectToServer(client, serverUri); + + session.getBasicRemote().sendBinary(binarymessage); + + while(!isTestComplete.get()) { + /* just wait for the test to finish */ + } + + Assert.assertEquals("Binary message does not match", textMessage, new String(recievedBinaryMessage)); + } + + @Test(timeout = 3000) + public void testTextMaxBufferLimit() throws IOException, DeploymentException { + + final String longMessage = RandomStringUtils.random(100000); + + final AtomicBoolean isTestComplete = new AtomicBoolean(false); + + final WebSocketContainer container = ContainerProvider.getWebSocketContainer(); + final ProxyInboundClient client = new ProxyInboundClient( new MessageEventCallback() { + + /** + * A generic callback, can be left un-implemented + * + * @param message + */ + @Override + public void doCallback(String message) { + + } + + /** + * Callback when connection is established. + * + * @param session + */ + @Override + public void onConnectionOpen(Object session) { + + } + + /** + * Callback when connection is closed. + * + * @param reason + */ + @Override + public void onConnectionClose(CloseReason reason) { + isTestComplete.set(true); + } + + /** + * Callback when there is an error in connection. + * + * @param cause + */ + @Override + public void onError(Throwable cause) { + isTestComplete.set(true); + } + + /** + * Callback when a text message is received. + * + * @param message + * @param session + */ + @Override + public void onMessageText(String message, Object session) { + recievedMessage = message; + isTestComplete.set(true); + } + + /** + * Callback when a binary message is received. + * + * @param message + * @param last + * @param session + */ + @Override + public void onMessageBinary(byte[] message, boolean last, + Object session) { + + } + } ); + + Assert.assertThat(client, instanceOf(javax.websocket.Endpoint.class)); + + Session session = container.connectToServer(client, serverUri); + + session.getBasicRemote().sendText(longMessage); + + while(!isTestComplete.get()) { + /* just wait for the test to finish */ + } + + Assert.assertEquals(longMessage, recievedMessage); + + } + + + +}