Repository: nifi Updated Branches: refs/heads/master b026f0beb -> 26a5881d2
http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/ControllerServiceTestContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/ControllerServiceTestContext.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/ControllerServiceTestContext.java new file mode 100644 index 0000000..062d528 --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/ControllerServiceTestContext.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.util.MockControllerServiceInitializationContext; +import org.apache.nifi.util.MockPropertyValue; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class ControllerServiceTestContext { + + private final ConfigurationContext configurationContext = mock(ConfigurationContext.class); + private final ValidationContext validationContext = mock(ValidationContext.class); + private MockControllerServiceInitializationContext initializationContext; + + public ControllerServiceTestContext(ControllerService controllerService, String id) { + initializationContext = new MockControllerServiceInitializationContext(controllerService, id); + doAnswer(invocation -> configurationContext.getProperty(invocation.getArgumentAt(0, PropertyDescriptor.class))) + .when(validationContext).getProperty(any(PropertyDescriptor.class)); + controllerService.getPropertyDescriptors().forEach(prop -> setDefaultValue(prop)); + } + + public MockControllerServiceInitializationContext getInitializationContext() { + return initializationContext; + } + + public ConfigurationContext getConfigurationContext() { + return configurationContext; + } + + public MockPropertyValue setDefaultValue(PropertyDescriptor propertyDescriptor) { + return setCustomValue(propertyDescriptor, propertyDescriptor.getDefaultValue()); + } + + public MockPropertyValue setCustomValue(PropertyDescriptor propertyDescriptor, String value) { + final MockPropertyValue propertyValue = new MockPropertyValue(value, initializationContext); + when(configurationContext.getProperty(eq(propertyDescriptor))) + .thenReturn(propertyValue); + return propertyValue; + } + + public ValidationContext getValidationContext() { + return validationContext; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketClient.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketClient.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketClient.java new file mode 100644 index 0000000..f71f4f2 --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketClient.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket; + +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.websocket.jetty.JettyWebSocketClient; +import org.junit.Test; + +import java.util.Collection; + +import static org.junit.Assert.assertEquals; + + +public class TestJettyWebSocketClient { + + @Test + public void testValidationRequiredProperties() throws Exception { + final JettyWebSocketClient service = new JettyWebSocketClient(); + final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id"); + service.initialize(context.getInitializationContext()); + final Collection<ValidationResult> results = service.validate(context.getValidationContext()); + assertEquals(1, results.size()); + final ValidationResult result = results.iterator().next(); + assertEquals(JettyWebSocketClient.WS_URI.getName(), result.getSubject()); + } + + @Test + public void testValidationSuccess() throws Exception { + final JettyWebSocketClient service = new JettyWebSocketClient(); + final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id"); + context.setCustomValue(JettyWebSocketClient.WS_URI, "ws://localhost:9001/test"); + service.initialize(context.getInitializationContext()); + final Collection<ValidationResult> results = service.validate(context.getValidationContext()); + assertEquals(0, results.size()); + } + + @Test + public void testValidationProtocol() throws Exception { + final JettyWebSocketClient service = new JettyWebSocketClient(); + final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id"); + context.setCustomValue(JettyWebSocketClient.WS_URI, "http://localhost:9001/test"); + service.initialize(context.getInitializationContext()); + final Collection<ValidationResult> results = service.validate(context.getValidationContext()); + assertEquals(1, results.size()); + final ValidationResult result = results.iterator().next(); + assertEquals(JettyWebSocketClient.WS_URI.getName(), result.getSubject()); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketCommunication.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketCommunication.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketCommunication.java new file mode 100644 index 0000000..9a32255 --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketCommunication.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket; + +import org.apache.nifi.processor.Processor; +import org.apache.nifi.websocket.jetty.JettyWebSocketClient; +import org.apache.nifi.websocket.jetty.JettyWebSocketServer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; + +import java.net.ServerSocket; +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + + +public class TestJettyWebSocketCommunication { + + protected int serverPort; + protected String serverPath = "/test"; + protected WebSocketServerService serverService; + protected ControllerServiceTestContext serverServiceContext; + protected WebSocketClientService clientService; + protected ControllerServiceTestContext clientServiceContext; + + protected boolean isSecure() { + return false; + } + + @Before + public void setup() throws Exception { + setupServer(); + + setupClient(); + } + + private void setupServer() throws Exception { + // Find an open port. + try (final ServerSocket serverSocket = new ServerSocket(0)) { + serverPort = serverSocket.getLocalPort(); + } + serverService = new JettyWebSocketServer(); + serverServiceContext = new ControllerServiceTestContext(serverService, "JettyWebSocketServer1"); + serverServiceContext.setCustomValue(JettyWebSocketServer.LISTEN_PORT, String.valueOf(serverPort)); + + customizeServer(); + + serverService.initialize(serverServiceContext.getInitializationContext()); + serverService.startServer(serverServiceContext.getConfigurationContext()); + } + + protected void customizeServer() { + } + + private void setupClient() throws Exception { + clientService = new JettyWebSocketClient(); + clientServiceContext = new ControllerServiceTestContext(clientService, "JettyWebSocketClient1"); + clientServiceContext.setCustomValue(JettyWebSocketClient.WS_URI, (isSecure() ? "wss" : "ws") + "://localhost:" + serverPort + serverPath); + + customizeClient(); + + clientService.initialize(clientServiceContext.getInitializationContext()); + clientService.startClient(clientServiceContext.getConfigurationContext()); + } + + protected void customizeClient() { + } + + @After + public void teardown() throws Exception { + clientService.stopClient(); + serverService.stopServer(); + } + + protected interface MockWebSocketProcessor extends Processor, ConnectedListener, TextMessageConsumer, BinaryMessageConsumer { + } + + @Test + public void testClientServerCommunication() throws Exception { + + // Expectations. + final CountDownLatch serverIsConnectedByClient = new CountDownLatch(1); + final CountDownLatch clientConnectedServer = new CountDownLatch(1); + final CountDownLatch serverReceivedTextMessageFromClient = new CountDownLatch(1); + final CountDownLatch serverReceivedBinaryMessageFromClient = new CountDownLatch(1); + final CountDownLatch clientReceivedTextMessageFromServer = new CountDownLatch(1); + final CountDownLatch clientReceivedBinaryMessageFromServer = new CountDownLatch(1); + + final String textMessageFromClient = "Message from client."; + final String textMessageFromServer = "Message from server."; + + final MockWebSocketProcessor serverProcessor = mock(MockWebSocketProcessor.class); + doReturn("serverProcessor1").when(serverProcessor).getIdentifier(); + final AtomicReference<String> serverSessionIdRef = new AtomicReference<>(); + + doAnswer(invocation -> assertConnectedEvent(serverIsConnectedByClient, serverSessionIdRef, invocation)) + .when(serverProcessor).connected(any(WebSocketSessionInfo.class)); + + doAnswer(invocation -> assertConsumeTextMessage(serverReceivedTextMessageFromClient, textMessageFromClient, invocation)) + .when(serverProcessor).consume(any(WebSocketSessionInfo.class), anyString()); + + doAnswer(invocation -> assertConsumeBinaryMessage(serverReceivedBinaryMessageFromClient, textMessageFromClient, invocation)) + .when(serverProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt()); + + serverService.registerProcessor(serverPath, serverProcessor); + + final String clientId = "client1"; + + final MockWebSocketProcessor clientProcessor = mock(MockWebSocketProcessor.class); + doReturn("clientProcessor1").when(clientProcessor).getIdentifier(); + final AtomicReference<String> clientSessionIdRef = new AtomicReference<>(); + + + doAnswer(invocation -> assertConnectedEvent(clientConnectedServer, clientSessionIdRef, invocation)) + .when(clientProcessor).connected(any(WebSocketSessionInfo.class)); + + doAnswer(invocation -> assertConsumeTextMessage(clientReceivedTextMessageFromServer, textMessageFromServer, invocation)) + .when(clientProcessor).consume(any(WebSocketSessionInfo.class), anyString()); + + doAnswer(invocation -> assertConsumeBinaryMessage(clientReceivedBinaryMessageFromServer, textMessageFromServer, invocation)) + .when(clientProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt()); + + clientService.registerProcessor(clientId, clientProcessor); + + clientService.connect(clientId); + + assertTrue("WebSocket client should be able to fire connected event.", clientConnectedServer.await(5, TimeUnit.SECONDS)); + assertTrue("WebSocket server should be able to fire connected event.", serverIsConnectedByClient.await(5, TimeUnit.SECONDS)); + + clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendString(textMessageFromClient)); + clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromClient.getBytes()))); + + + assertTrue("WebSocket server should be able to consume text message.", serverReceivedTextMessageFromClient.await(5, TimeUnit.SECONDS)); + assertTrue("WebSocket server should be able to consume binary message.", serverReceivedBinaryMessageFromClient.await(5, TimeUnit.SECONDS)); + + serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendString(textMessageFromServer)); + serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromServer.getBytes()))); + + assertTrue("WebSocket client should be able to consume text message.", clientReceivedTextMessageFromServer.await(5, TimeUnit.SECONDS)); + assertTrue("WebSocket client should be able to consume binary message.", clientReceivedBinaryMessageFromServer.await(5, TimeUnit.SECONDS)); + + clientService.deregisterProcessor(clientId, clientProcessor); + serverService.deregisterProcessor(serverPath, serverProcessor); + } + + protected Object assertConnectedEvent(CountDownLatch latch, AtomicReference<String> sessionIdRef, InvocationOnMock invocation) { + final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class); + assertNotNull(sessionInfo.getLocalAddress()); + assertNotNull(sessionInfo.getRemoteAddress()); + assertNotNull(sessionInfo.getSessionId()); + assertEquals(isSecure(), sessionInfo.isSecure()); + sessionIdRef.set(sessionInfo.getSessionId()); + latch.countDown(); + return null; + } + + protected Object assertConsumeTextMessage(CountDownLatch latch, String expectedMessage, InvocationOnMock invocation) { + final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class); + assertNotNull(sessionInfo.getLocalAddress()); + assertNotNull(sessionInfo.getRemoteAddress()); + assertNotNull(sessionInfo.getSessionId()); + assertEquals(isSecure(), sessionInfo.isSecure()); + + final String receivedMessage = invocation.getArgumentAt(1, String.class); + assertNotNull(receivedMessage); + assertEquals(expectedMessage, receivedMessage); + latch.countDown(); + return null; + } + + protected Object assertConsumeBinaryMessage(CountDownLatch latch, String expectedMessage, InvocationOnMock invocation) { + final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class); + assertNotNull(sessionInfo.getLocalAddress()); + assertNotNull(sessionInfo.getRemoteAddress()); + assertNotNull(sessionInfo.getSessionId()); + assertEquals(isSecure(), sessionInfo.isSecure()); + + final byte[] receivedMessage = invocation.getArgumentAt(1, byte[].class); + final byte[] expectedBinary = expectedMessage.getBytes(); + final int offset = invocation.getArgumentAt(2, Integer.class); + final int length = invocation.getArgumentAt(3, Integer.class); + assertNotNull(receivedMessage); + assertEquals(expectedBinary.length, receivedMessage.length); + assertEquals(expectedMessage, new String(receivedMessage)); + assertEquals(0, offset); + assertEquals(expectedBinary.length, length); + latch.countDown(); + return null; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketSecureCommunication.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketSecureCommunication.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketSecureCommunication.java new file mode 100644 index 0000000..e25189a --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketSecureCommunication.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.StandardSSLContextService; +import org.junit.Test; + + +public class TestJettyWebSocketSecureCommunication extends TestJettyWebSocketCommunication{ + + private final StandardSSLContextService sslContextService = new StandardSSLContextService(); + private final ControllerServiceTestContext sslTestContext = new ControllerServiceTestContext(sslContextService, "SSLContextService"); + + public TestJettyWebSocketSecureCommunication() { + try { + sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE, "src/test/resources/certs/localhost-ks.jks"); + sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_PASSWORD, "localtest"); + sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_TYPE, "JKS"); + sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE, "src/test/resources/certs/localhost-ks.jks"); + sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest"); + sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_TYPE, "JKS"); + + sslContextService.initialize(sslTestContext.getInitializationContext()); + sslContextService.onConfigured(sslTestContext.getConfigurationContext()); + } catch (InitializationException e) { + throw new RuntimeException(e); + } + } + + @Override + protected boolean isSecure() { + return true; + } + + @Override + protected void customizeServer() { + serverServiceContext.getInitializationContext().addControllerService(sslContextService); + serverServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, sslContextService.getIdentifier()); + } + + @Override + protected void customizeClient() { + clientServiceContext.getInitializationContext().addControllerService(sslContextService); + clientServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, sslContextService.getIdentifier()); + } + + @Test + public void testClientServerCommunication() throws Exception { + super.testClientServerCommunication(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketServer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketServer.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketServer.java new file mode 100644 index 0000000..4cd17f6 --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketServer.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket; + +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.websocket.jetty.JettyWebSocketServer; +import org.junit.Test; + +import java.util.Collection; + +import static org.junit.Assert.assertEquals; + + +public class TestJettyWebSocketServer { + + @Test + public void testValidationRequiredProperties() throws Exception { + final JettyWebSocketServer service = new JettyWebSocketServer(); + final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id"); + service.initialize(context.getInitializationContext()); + final Collection<ValidationResult> results = service.validate(context.getValidationContext()); + assertEquals(1, results.size()); + final ValidationResult result = results.iterator().next(); + assertEquals(JettyWebSocketServer.LISTEN_PORT.getName(), result.getSubject()); + } + + @Test + public void testValidationSuccess() throws Exception { + final JettyWebSocketServer service = new JettyWebSocketServer(); + final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id"); + context.setCustomValue(JettyWebSocketServer.LISTEN_PORT, "9001"); + service.initialize(context.getInitializationContext()); + final Collection<ValidationResult> results = service.validate(context.getValidationContext()); + assertEquals(0, results.size()); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/example/WebSocketClientExample.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/example/WebSocketClientExample.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/example/WebSocketClientExample.java new file mode 100644 index 0000000..5ef73bf --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/example/WebSocketClientExample.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket.example; + +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; +import org.eclipse.jetty.websocket.api.WebSocketAdapter; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * This is a WebSocket client example testcase. + */ +@Ignore +public class WebSocketClientExample { + + private static Logger logger = LoggerFactory.getLogger(WebSocketClientExample.class); + + @Test + public void test() { + String destUri = "wss://localhost:50010/test"; + + final CountDownLatch replyLatch = new CountDownLatch(1); + final SslContextFactory sslContextFactory = new SslContextFactory(); + sslContextFactory.setKeyStorePath("src/test/resources/certs/localhost-ks.jks"); + sslContextFactory.setKeyStorePassword("localtest"); + sslContextFactory.setKeyStoreType("JKS"); + sslContextFactory.setTrustStorePath("src/test/resources/certs/localhost-ks.jks"); + sslContextFactory.setTrustStorePassword("localtest"); + sslContextFactory.setTrustStoreType("JKS"); + + WebSocketClient client = new WebSocketClient(sslContextFactory); + WebSocketAdapter socket = new WebSocketAdapter() { + @Override + public void onWebSocketConnect(Session session) { + super.onWebSocketConnect(session); + + try { + session.getRemote().sendString("Hello, this is Jetty ws client."); + } catch (IOException e) { + logger.error("Failed to send a message due to " + e, e); + } + } + + @Override + public void onWebSocketText(String message) { + logger.info("Received a reply: {}", message); + replyLatch.countDown(); + } + }; + try { + client.start(); + + URI echoUri = new URI(destUri); + ClientUpgradeRequest request = new ClientUpgradeRequest(); + final Future<Session> connect = client.connect(socket, echoUri, request); + logger.info("Connecting to : {}", echoUri); + + final Session session = connect.get(3, TimeUnit.SECONDS); + logger.info("Connected, session={}", session); + + session.close(StatusCode.NORMAL, "Bye"); + + } catch (Throwable t) { + t.printStackTrace(); + } finally { + try { + client.stop(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/example/WebSocketServerExample.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/example/WebSocketServerExample.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/example/WebSocketServerExample.java new file mode 100644 index 0000000..eddecd5 --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/example/WebSocketServerExample.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket.example; + +import org.eclipse.jetty.server.Connector; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.HttpConnectionFactory; +import org.eclipse.jetty.server.SecureRequestCustomizer; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.SslConnectionFactory; +import org.eclipse.jetty.server.handler.ContextHandlerCollection; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketAdapter; +import org.eclipse.jetty.websocket.api.WebSocketListener; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.eclipse.jetty.websocket.servlet.WebSocketCreator; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * This is a WebSocket server example testcase. + */ +@Ignore +public class WebSocketServerExample { + + private static Logger logger = LoggerFactory.getLogger(WebSocketServerExample.class); + private static Server server; + private static ServletHandler servletHandler; + private static ServletHolder servletHolder; + private static ServerConnector httpConnector; + private static ServerConnector sslConnector; + + private static final Map<Integer, WebSocketServerExample> portToController = new HashMap<>(); + + private Map<String, WebSocketListener> listeners = new HashMap<>(); + + public class SocketListener extends WebSocketAdapter { + + public SocketListener() { + logger.info("New instance is created: {}", this); + } + + @Override + public void onWebSocketConnect(Session session) { + logger.info("Connected, {}, {}", session.getLocalAddress(), session.getRemoteAddress()); + super.onWebSocketConnect(session); + + session.getUpgradeRequest().getRequestURI(); + } + + @Override + public void onWebSocketText(String message) { + logger.info("Received: {}", message); + + final String resultMessage; + if (message.startsWith("add-servlet")) { + // Is it possible to add servlet mapping?? + final String path = message.split(":")[1].trim(); + servletHandler.addServletWithMapping(servletHolder, path); + + resultMessage = "Deployed new servlet under: " + path; + } else { + resultMessage = "Got message: " + message; + } + + try { + getSession().getRemote().sendString(resultMessage); + } catch (IOException e) { + logger.error("Failed to send a message back to remote.", e); + } + } + + } + + public WebSocketServerExample() { + this.listeners.put("/test", new SocketListener()); + portToController.put(httpConnector.getPort(), this); + portToController.put(sslConnector.getPort(), this); + } + + public static class WSServlet extends WebSocketServlet implements WebSocketCreator { + @Override + public void configure(WebSocketServletFactory webSocketServletFactory) { + webSocketServletFactory.setCreator(this); + } + + @Override + public Object createWebSocket(ServletUpgradeRequest servletUpgradeRequest, ServletUpgradeResponse servletUpgradeResponse) { + final WebSocketServerExample testWebSocket = portToController.get(servletUpgradeRequest.getLocalPort()); + return testWebSocket.listeners.get(servletUpgradeRequest.getRequestURI().getPath()); + } + } + + public static class ConnectionCheckServlet extends HttpServlet { + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + resp.setContentType("text/plain"); + resp.setStatus(HttpServletResponse.SC_OK); + resp.getWriter().println("Ok :)"); + } + } + + @BeforeClass + public static void setup() throws Exception { + server = new Server(0); + + final ContextHandlerCollection handlerCollection = new ContextHandlerCollection(); + + final ServletContextHandler contextHandler = new ServletContextHandler(); + servletHandler = new ServletHandler(); + contextHandler.insertHandler(servletHandler); + + handlerCollection.setHandlers(new Handler[]{contextHandler}); + + server.setHandler(handlerCollection); + + httpConnector = new ServerConnector(server); + httpConnector.setPort(50010); + + final SslContextFactory sslContextFactory = new SslContextFactory(); + sslContextFactory.setKeyStorePath("src/test/resources/certs/localhost-ks.jks"); + sslContextFactory.setKeyStorePassword("localtest"); + sslContextFactory.setKeyStoreType("JKS"); + + final HttpConfiguration https = new HttpConfiguration(); + https.addCustomizer(new SecureRequestCustomizer()); + sslConnector = new ServerConnector(server, + new SslConnectionFactory(sslContextFactory, "http/1.1"), + new HttpConnectionFactory(https)); + sslConnector.setPort(50011); + + + server.setConnectors(new Connector[]{httpConnector, sslConnector}); + + servletHolder = servletHandler.addServletWithMapping(WSServlet.class, "/test"); + servletHolder = servletHandler.addServletWithMapping(ConnectionCheckServlet.class, "/check"); + + server.start(); + + logger.info("Starting server on port {} for HTTP, and {} for HTTPS", httpConnector.getLocalPort(), sslConnector.getLocalPort()); + + + } + + @AfterClass + public static void teardown() throws Exception { + logger.info("Stopping server."); + try { + server.stop(); + } catch (Exception e) { + logger.error("Failed to stop Jetty server due to " + e, e); + } + } + + @Test + public void test() throws Exception { + logger.info("Waiting for a while..."); + Thread.sleep(1000_000); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/resources/certs/localhost-ks.jks ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/resources/certs/localhost-ks.jks b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/resources/certs/localhost-ks.jks new file mode 100755 index 0000000..119b50f Binary files /dev/null and b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/resources/certs/localhost-ks.jks differ http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/resources/certs/localhost-ts.jks ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/resources/certs/localhost-ts.jks b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/resources/certs/localhost-ts.jks new file mode 100755 index 0000000..7824378 Binary files /dev/null and b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/resources/certs/localhost-ts.jks differ http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/resources/certs/localhost.crt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/resources/certs/localhost.crt b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/resources/certs/localhost.crt new file mode 100644 index 0000000..6c50a80 Binary files /dev/null and b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/resources/certs/localhost.crt differ http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/pom.xml b/nifi-nar-bundles/nifi-websocket-bundle/pom.xml new file mode 100644 index 0000000..f2ed951 --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/pom.xml @@ -0,0 +1,39 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-bundles</artifactId> + <version>1.1.0-SNAPSHOT</version> + </parent> + + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-websocket-bundle</artifactId> + <version>1.1.0-SNAPSHOT</version> + <packaging>pom</packaging> + + <modules> + <module>nifi-websocket-processors</module> + <module>nifi-websocket-processors-nar</module> + <module>nifi-websocket-services-api</module> + <module>nifi-websocket-services-api-nar</module> + <module>nifi-websocket-services-jetty</module> + <module>nifi-websocket-services-jetty-nar</module> + </modules> + +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 476ea23..2ead087 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -71,6 +71,7 @@ <module>nifi-ignite-bundle</module> <module>nifi-email-bundle</module> <module>nifi-ranger-bundle</module> + <module>nifi-websocket-bundle</module> </modules> <dependencyManagement> http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 957390c..f6eed44 100644 --- a/pom.xml +++ b/pom.xml @@ -1253,6 +1253,24 @@ language governing permissions and limitations under the License. --> </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-websocket-services-api-nar</artifactId> + <version>1.1.0-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-websocket-services-jetty-nar</artifactId> + <version>1.1.0-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-websocket-processors-nar</artifactId> + <version>1.1.0-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-datadog-nar</artifactId> <version>1.1.0-SNAPSHOT</version> <type>nar</type> @@ -1320,6 +1338,16 @@ language governing permissions and limitations under the License. --> </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-websocket-services-api</artifactId> + <version>1.1.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-websocket-services-jetty</artifactId> + <version>1.1.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-assembly</artifactId> <version>1.1.0-SNAPSHOT</version> </dependency>