Repository: nifi
Updated Branches:
  refs/heads/master b026f0beb -> 26a5881d2


http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/ControllerServiceTestContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/ControllerServiceTestContext.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/ControllerServiceTestContext.java
new file mode 100644
index 0000000..062d528
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/ControllerServiceTestContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.util.MockControllerServiceInitializationContext;
+import org.apache.nifi.util.MockPropertyValue;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class ControllerServiceTestContext {
+
+    private final ConfigurationContext configurationContext = 
mock(ConfigurationContext.class);
+    private final ValidationContext validationContext = 
mock(ValidationContext.class);
+    private MockControllerServiceInitializationContext initializationContext;
+
+    public ControllerServiceTestContext(ControllerService controllerService, 
String id) {
+        initializationContext = new 
MockControllerServiceInitializationContext(controllerService, id);
+        doAnswer(invocation -> 
configurationContext.getProperty(invocation.getArgumentAt(0, 
PropertyDescriptor.class)))
+                
.when(validationContext).getProperty(any(PropertyDescriptor.class));
+        controllerService.getPropertyDescriptors().forEach(prop -> 
setDefaultValue(prop));
+    }
+
+    public MockControllerServiceInitializationContext 
getInitializationContext() {
+        return initializationContext;
+    }
+
+    public ConfigurationContext getConfigurationContext() {
+        return configurationContext;
+    }
+
+    public MockPropertyValue setDefaultValue(PropertyDescriptor 
propertyDescriptor) {
+        return setCustomValue(propertyDescriptor, 
propertyDescriptor.getDefaultValue());
+    }
+
+    public MockPropertyValue setCustomValue(PropertyDescriptor 
propertyDescriptor, String value) {
+        final MockPropertyValue propertyValue = new MockPropertyValue(value, 
initializationContext);
+        when(configurationContext.getProperty(eq(propertyDescriptor)))
+                .thenReturn(propertyValue);
+        return propertyValue;
+    }
+
+    public ValidationContext getValidationContext() {
+        return validationContext;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketClient.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketClient.java
new file mode 100644
index 0000000..f71f4f2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketClient.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.websocket.jetty.JettyWebSocketClient;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestJettyWebSocketClient {
+
+    @Test
+    public void testValidationRequiredProperties() throws Exception {
+        final JettyWebSocketClient service = new JettyWebSocketClient();
+        final ControllerServiceTestContext context = new 
ControllerServiceTestContext(service, "service-id");
+        service.initialize(context.getInitializationContext());
+        final Collection<ValidationResult> results = 
service.validate(context.getValidationContext());
+        assertEquals(1, results.size());
+        final ValidationResult result = results.iterator().next();
+        assertEquals(JettyWebSocketClient.WS_URI.getName(), 
result.getSubject());
+    }
+
+    @Test
+    public void testValidationSuccess() throws Exception {
+        final JettyWebSocketClient service = new JettyWebSocketClient();
+        final ControllerServiceTestContext context = new 
ControllerServiceTestContext(service, "service-id");
+        context.setCustomValue(JettyWebSocketClient.WS_URI, 
"ws://localhost:9001/test");
+        service.initialize(context.getInitializationContext());
+        final Collection<ValidationResult> results = 
service.validate(context.getValidationContext());
+        assertEquals(0, results.size());
+    }
+
+    @Test
+    public void testValidationProtocol() throws Exception {
+        final JettyWebSocketClient service = new JettyWebSocketClient();
+        final ControllerServiceTestContext context = new 
ControllerServiceTestContext(service, "service-id");
+        context.setCustomValue(JettyWebSocketClient.WS_URI, 
"http://localhost:9001/test";);
+        service.initialize(context.getInitializationContext());
+        final Collection<ValidationResult> results = 
service.validate(context.getValidationContext());
+        assertEquals(1, results.size());
+        final ValidationResult result = results.iterator().next();
+        assertEquals(JettyWebSocketClient.WS_URI.getName(), 
result.getSubject());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketCommunication.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketCommunication.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketCommunication.java
new file mode 100644
index 0000000..9a32255
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketCommunication.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket;
+
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.websocket.jetty.JettyWebSocketClient;
+import org.apache.nifi.websocket.jetty.JettyWebSocketServer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+
+import java.net.ServerSocket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+
+public class TestJettyWebSocketCommunication {
+
+    protected int serverPort;
+    protected String serverPath = "/test";
+    protected WebSocketServerService serverService;
+    protected ControllerServiceTestContext serverServiceContext;
+    protected WebSocketClientService clientService;
+    protected ControllerServiceTestContext clientServiceContext;
+
+    protected boolean isSecure() {
+        return false;
+    }
+
+    @Before
+    public void setup() throws Exception {
+        setupServer();
+
+        setupClient();
+    }
+
+    private void setupServer() throws Exception {
+        // Find an open port.
+        try (final ServerSocket serverSocket = new ServerSocket(0)) {
+            serverPort = serverSocket.getLocalPort();
+        }
+        serverService = new JettyWebSocketServer();
+        serverServiceContext = new ControllerServiceTestContext(serverService, 
"JettyWebSocketServer1");
+        serverServiceContext.setCustomValue(JettyWebSocketServer.LISTEN_PORT, 
String.valueOf(serverPort));
+
+        customizeServer();
+
+        
serverService.initialize(serverServiceContext.getInitializationContext());
+        
serverService.startServer(serverServiceContext.getConfigurationContext());
+    }
+
+    protected void customizeServer() {
+    }
+
+    private void setupClient() throws Exception {
+        clientService = new JettyWebSocketClient();
+        clientServiceContext = new ControllerServiceTestContext(clientService, 
"JettyWebSocketClient1");
+        clientServiceContext.setCustomValue(JettyWebSocketClient.WS_URI, 
(isSecure() ? "wss" : "ws") + "://localhost:" + serverPort + serverPath);
+
+        customizeClient();
+
+        
clientService.initialize(clientServiceContext.getInitializationContext());
+        
clientService.startClient(clientServiceContext.getConfigurationContext());
+    }
+
+    protected void customizeClient() {
+    }
+
+    @After
+    public void teardown() throws Exception {
+        clientService.stopClient();
+        serverService.stopServer();
+    }
+
+    protected interface MockWebSocketProcessor extends Processor, 
ConnectedListener, TextMessageConsumer, BinaryMessageConsumer {
+    }
+
+    @Test
+    public void testClientServerCommunication() throws Exception {
+
+        // Expectations.
+        final CountDownLatch serverIsConnectedByClient = new CountDownLatch(1);
+        final CountDownLatch clientConnectedServer = new CountDownLatch(1);
+        final CountDownLatch serverReceivedTextMessageFromClient = new 
CountDownLatch(1);
+        final CountDownLatch serverReceivedBinaryMessageFromClient = new 
CountDownLatch(1);
+        final CountDownLatch clientReceivedTextMessageFromServer = new 
CountDownLatch(1);
+        final CountDownLatch clientReceivedBinaryMessageFromServer = new 
CountDownLatch(1);
+
+        final String textMessageFromClient = "Message from client.";
+        final String textMessageFromServer = "Message from server.";
+
+        final MockWebSocketProcessor serverProcessor = 
mock(MockWebSocketProcessor.class);
+        doReturn("serverProcessor1").when(serverProcessor).getIdentifier();
+        final AtomicReference<String> serverSessionIdRef = new 
AtomicReference<>();
+
+        doAnswer(invocation -> assertConnectedEvent(serverIsConnectedByClient, 
serverSessionIdRef, invocation))
+                
.when(serverProcessor).connected(any(WebSocketSessionInfo.class));
+
+        doAnswer(invocation -> 
assertConsumeTextMessage(serverReceivedTextMessageFromClient, 
textMessageFromClient, invocation))
+                
.when(serverProcessor).consume(any(WebSocketSessionInfo.class), anyString());
+
+        doAnswer(invocation -> 
assertConsumeBinaryMessage(serverReceivedBinaryMessageFromClient, 
textMessageFromClient, invocation))
+                
.when(serverProcessor).consume(any(WebSocketSessionInfo.class), 
any(byte[].class), anyInt(), anyInt());
+
+        serverService.registerProcessor(serverPath, serverProcessor);
+
+        final String clientId = "client1";
+
+        final MockWebSocketProcessor clientProcessor = 
mock(MockWebSocketProcessor.class);
+        doReturn("clientProcessor1").when(clientProcessor).getIdentifier();
+        final AtomicReference<String> clientSessionIdRef = new 
AtomicReference<>();
+
+
+        doAnswer(invocation -> assertConnectedEvent(clientConnectedServer, 
clientSessionIdRef, invocation))
+                
.when(clientProcessor).connected(any(WebSocketSessionInfo.class));
+
+        doAnswer(invocation -> 
assertConsumeTextMessage(clientReceivedTextMessageFromServer, 
textMessageFromServer, invocation))
+                
.when(clientProcessor).consume(any(WebSocketSessionInfo.class), anyString());
+
+        doAnswer(invocation -> 
assertConsumeBinaryMessage(clientReceivedBinaryMessageFromServer, 
textMessageFromServer, invocation))
+                
.when(clientProcessor).consume(any(WebSocketSessionInfo.class), 
any(byte[].class), anyInt(), anyInt());
+
+        clientService.registerProcessor(clientId, clientProcessor);
+
+        clientService.connect(clientId);
+
+        assertTrue("WebSocket client should be able to fire connected event.", 
clientConnectedServer.await(5, TimeUnit.SECONDS));
+        assertTrue("WebSocket server should be able to fire connected event.", 
serverIsConnectedByClient.await(5, TimeUnit.SECONDS));
+
+        clientService.sendMessage(clientId, clientSessionIdRef.get(), sender 
-> sender.sendString(textMessageFromClient));
+        clientService.sendMessage(clientId, clientSessionIdRef.get(), sender 
-> sender.sendBinary(ByteBuffer.wrap(textMessageFromClient.getBytes())));
+
+
+        assertTrue("WebSocket server should be able to consume text message.", 
serverReceivedTextMessageFromClient.await(5, TimeUnit.SECONDS));
+        assertTrue("WebSocket server should be able to consume binary 
message.", serverReceivedBinaryMessageFromClient.await(5, TimeUnit.SECONDS));
+
+        serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender 
-> sender.sendString(textMessageFromServer));
+        serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender 
-> sender.sendBinary(ByteBuffer.wrap(textMessageFromServer.getBytes())));
+
+        assertTrue("WebSocket client should be able to consume text message.", 
clientReceivedTextMessageFromServer.await(5, TimeUnit.SECONDS));
+        assertTrue("WebSocket client should be able to consume binary 
message.", clientReceivedBinaryMessageFromServer.await(5, TimeUnit.SECONDS));
+
+        clientService.deregisterProcessor(clientId, clientProcessor);
+        serverService.deregisterProcessor(serverPath, serverProcessor);
+    }
+
+    protected Object assertConnectedEvent(CountDownLatch latch, 
AtomicReference<String> sessionIdRef, InvocationOnMock invocation) {
+        final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, 
WebSocketSessionInfo.class);
+        assertNotNull(sessionInfo.getLocalAddress());
+        assertNotNull(sessionInfo.getRemoteAddress());
+        assertNotNull(sessionInfo.getSessionId());
+        assertEquals(isSecure(), sessionInfo.isSecure());
+        sessionIdRef.set(sessionInfo.getSessionId());
+        latch.countDown();
+        return null;
+    }
+
+    protected Object assertConsumeTextMessage(CountDownLatch latch, String 
expectedMessage, InvocationOnMock invocation) {
+        final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, 
WebSocketSessionInfo.class);
+        assertNotNull(sessionInfo.getLocalAddress());
+        assertNotNull(sessionInfo.getRemoteAddress());
+        assertNotNull(sessionInfo.getSessionId());
+        assertEquals(isSecure(), sessionInfo.isSecure());
+
+        final String receivedMessage = invocation.getArgumentAt(1, 
String.class);
+        assertNotNull(receivedMessage);
+        assertEquals(expectedMessage, receivedMessage);
+        latch.countDown();
+        return null;
+    }
+
+    protected Object assertConsumeBinaryMessage(CountDownLatch latch, String 
expectedMessage, InvocationOnMock invocation) {
+        final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, 
WebSocketSessionInfo.class);
+        assertNotNull(sessionInfo.getLocalAddress());
+        assertNotNull(sessionInfo.getRemoteAddress());
+        assertNotNull(sessionInfo.getSessionId());
+        assertEquals(isSecure(), sessionInfo.isSecure());
+
+        final byte[] receivedMessage = invocation.getArgumentAt(1, 
byte[].class);
+        final byte[] expectedBinary = expectedMessage.getBytes();
+        final int offset = invocation.getArgumentAt(2, Integer.class);
+        final int length = invocation.getArgumentAt(3, Integer.class);
+        assertNotNull(receivedMessage);
+        assertEquals(expectedBinary.length, receivedMessage.length);
+        assertEquals(expectedMessage, new String(receivedMessage));
+        assertEquals(0, offset);
+        assertEquals(expectedBinary.length, length);
+        latch.countDown();
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketSecureCommunication.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketSecureCommunication.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketSecureCommunication.java
new file mode 100644
index 0000000..e25189a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketSecureCommunication.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.StandardSSLContextService;
+import org.junit.Test;
+
+
+public class TestJettyWebSocketSecureCommunication extends 
TestJettyWebSocketCommunication{
+
+    private final StandardSSLContextService sslContextService = new 
StandardSSLContextService();
+    private final ControllerServiceTestContext sslTestContext = new 
ControllerServiceTestContext(sslContextService, "SSLContextService");
+
+    public TestJettyWebSocketSecureCommunication() {
+        try {
+            sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE, 
"src/test/resources/certs/localhost-ks.jks");
+            
sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_PASSWORD, 
"localtest");
+            
sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_TYPE, "JKS");
+            
sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE, 
"src/test/resources/certs/localhost-ks.jks");
+            
sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_PASSWORD, 
"localtest");
+            
sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
+
+            
sslContextService.initialize(sslTestContext.getInitializationContext());
+            
sslContextService.onConfigured(sslTestContext.getConfigurationContext());
+        } catch (InitializationException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    protected boolean isSecure() {
+        return true;
+    }
+
+    @Override
+    protected void customizeServer() {
+        
serverServiceContext.getInitializationContext().addControllerService(sslContextService);
+        serverServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, 
sslContextService.getIdentifier());
+    }
+
+    @Override
+    protected void customizeClient() {
+        
clientServiceContext.getInitializationContext().addControllerService(sslContextService);
+        clientServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, 
sslContextService.getIdentifier());
+    }
+
+    @Test
+    public void testClientServerCommunication() throws Exception {
+        super.testClientServerCommunication();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketServer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketServer.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketServer.java
new file mode 100644
index 0000000..4cd17f6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/TestJettyWebSocketServer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.websocket.jetty.JettyWebSocketServer;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestJettyWebSocketServer {
+
+    @Test
+    public void testValidationRequiredProperties() throws Exception {
+        final JettyWebSocketServer service = new JettyWebSocketServer();
+        final ControllerServiceTestContext context = new 
ControllerServiceTestContext(service, "service-id");
+        service.initialize(context.getInitializationContext());
+        final Collection<ValidationResult> results = 
service.validate(context.getValidationContext());
+        assertEquals(1, results.size());
+        final ValidationResult result = results.iterator().next();
+        assertEquals(JettyWebSocketServer.LISTEN_PORT.getName(), 
result.getSubject());
+    }
+
+    @Test
+    public void testValidationSuccess() throws Exception {
+        final JettyWebSocketServer service = new JettyWebSocketServer();
+        final ControllerServiceTestContext context = new 
ControllerServiceTestContext(service, "service-id");
+        context.setCustomValue(JettyWebSocketServer.LISTEN_PORT, "9001");
+        service.initialize(context.getInitializationContext());
+        final Collection<ValidationResult> results = 
service.validate(context.getValidationContext());
+        assertEquals(0, results.size());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/example/WebSocketClientExample.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/example/WebSocketClientExample.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/example/WebSocketClientExample.java
new file mode 100644
index 0000000..5ef73bf
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/example/WebSocketClientExample.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket.example;
+
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.StatusCode;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This is a WebSocket client example testcase.
+ */
+@Ignore
+public class WebSocketClientExample {
+
+    private static Logger logger = 
LoggerFactory.getLogger(WebSocketClientExample.class);
+
+    @Test
+    public void test() {
+        String destUri = "wss://localhost:50010/test";
+
+        final CountDownLatch replyLatch = new CountDownLatch(1);
+        final SslContextFactory sslContextFactory = new SslContextFactory();
+        
sslContextFactory.setKeyStorePath("src/test/resources/certs/localhost-ks.jks");
+        sslContextFactory.setKeyStorePassword("localtest");
+        sslContextFactory.setKeyStoreType("JKS");
+        
sslContextFactory.setTrustStorePath("src/test/resources/certs/localhost-ks.jks");
+        sslContextFactory.setTrustStorePassword("localtest");
+        sslContextFactory.setTrustStoreType("JKS");
+
+        WebSocketClient client = new WebSocketClient(sslContextFactory);
+        WebSocketAdapter socket = new WebSocketAdapter() {
+            @Override
+            public void onWebSocketConnect(Session session) {
+                super.onWebSocketConnect(session);
+
+                try {
+                    session.getRemote().sendString("Hello, this is Jetty ws 
client.");
+                } catch (IOException e) {
+                    logger.error("Failed to send a message due to " + e, e);
+                }
+            }
+
+            @Override
+            public void onWebSocketText(String message) {
+                logger.info("Received a reply: {}", message);
+                replyLatch.countDown();
+            }
+        };
+        try {
+            client.start();
+
+            URI echoUri = new URI(destUri);
+            ClientUpgradeRequest request = new ClientUpgradeRequest();
+            final Future<Session> connect = client.connect(socket, echoUri, 
request);
+            logger.info("Connecting to : {}", echoUri);
+
+            final Session session = connect.get(3, TimeUnit.SECONDS);
+            logger.info("Connected, session={}", session);
+
+            session.close(StatusCode.NORMAL, "Bye");
+
+        } catch (Throwable t) {
+            t.printStackTrace();
+        } finally {
+            try {
+                client.stop();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/example/WebSocketServerExample.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/example/WebSocketServerExample.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/example/WebSocketServerExample.java
new file mode 100644
index 0000000..eddecd5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/example/WebSocketServerExample.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket.example;
+
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.SecureRequestCustomizer;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.SslConnectionFactory;
+import org.eclipse.jetty.server.handler.ContextHandlerCollection;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+import org.eclipse.jetty.websocket.api.WebSocketListener;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
+import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This is a WebSocket server example testcase.
+ */
+@Ignore
+public class WebSocketServerExample {
+
+    private static Logger logger = 
LoggerFactory.getLogger(WebSocketServerExample.class);
+    private static Server server;
+    private static ServletHandler servletHandler;
+    private static ServletHolder servletHolder;
+    private static ServerConnector httpConnector;
+    private static ServerConnector sslConnector;
+
+    private static final Map<Integer, WebSocketServerExample> portToController 
= new HashMap<>();
+
+    private Map<String, WebSocketListener> listeners = new HashMap<>();
+
+    public class SocketListener extends WebSocketAdapter {
+
+        public SocketListener() {
+            logger.info("New instance is created: {}", this);
+        }
+
+        @Override
+        public void onWebSocketConnect(Session session) {
+            logger.info("Connected, {}, {}", session.getLocalAddress(), 
session.getRemoteAddress());
+            super.onWebSocketConnect(session);
+
+            session.getUpgradeRequest().getRequestURI();
+        }
+
+        @Override
+        public void onWebSocketText(String message) {
+            logger.info("Received: {}", message);
+
+            final String resultMessage;
+            if (message.startsWith("add-servlet")) {
+                // Is it possible to add servlet mapping??
+                final String path = message.split(":")[1].trim();
+                servletHandler.addServletWithMapping(servletHolder, path);
+
+                resultMessage = "Deployed new servlet under: " + path;
+            } else {
+                resultMessage = "Got message: " + message;
+            }
+
+            try {
+                getSession().getRemote().sendString(resultMessage);
+            } catch (IOException e) {
+                logger.error("Failed to send a message back to remote.", e);
+            }
+        }
+
+    }
+
+    public WebSocketServerExample() {
+        this.listeners.put("/test", new SocketListener());
+        portToController.put(httpConnector.getPort(), this);
+        portToController.put(sslConnector.getPort(), this);
+    }
+
+    public static class WSServlet extends WebSocketServlet implements 
WebSocketCreator {
+        @Override
+        public void configure(WebSocketServletFactory webSocketServletFactory) 
{
+            webSocketServletFactory.setCreator(this);
+        }
+
+        @Override
+        public Object createWebSocket(ServletUpgradeRequest 
servletUpgradeRequest, ServletUpgradeResponse servletUpgradeResponse) {
+            final WebSocketServerExample testWebSocket = 
portToController.get(servletUpgradeRequest.getLocalPort());
+            return 
testWebSocket.listeners.get(servletUpgradeRequest.getRequestURI().getPath());
+        }
+    }
+
+    public static class ConnectionCheckServlet extends HttpServlet {
+        @Override
+        protected void doGet(HttpServletRequest req, HttpServletResponse resp) 
throws ServletException, IOException {
+            resp.setContentType("text/plain");
+            resp.setStatus(HttpServletResponse.SC_OK);
+            resp.getWriter().println("Ok :)");
+        }
+    }
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        server = new Server(0);
+
+        final ContextHandlerCollection handlerCollection = new 
ContextHandlerCollection();
+
+        final ServletContextHandler contextHandler = new 
ServletContextHandler();
+        servletHandler = new ServletHandler();
+        contextHandler.insertHandler(servletHandler);
+
+        handlerCollection.setHandlers(new Handler[]{contextHandler});
+
+        server.setHandler(handlerCollection);
+
+        httpConnector = new ServerConnector(server);
+        httpConnector.setPort(50010);
+
+        final SslContextFactory sslContextFactory = new SslContextFactory();
+        
sslContextFactory.setKeyStorePath("src/test/resources/certs/localhost-ks.jks");
+        sslContextFactory.setKeyStorePassword("localtest");
+        sslContextFactory.setKeyStoreType("JKS");
+
+        final HttpConfiguration https = new HttpConfiguration();
+        https.addCustomizer(new SecureRequestCustomizer());
+        sslConnector = new ServerConnector(server,
+                new SslConnectionFactory(sslContextFactory, "http/1.1"),
+                new HttpConnectionFactory(https));
+        sslConnector.setPort(50011);
+
+
+        server.setConnectors(new Connector[]{httpConnector, sslConnector});
+
+        servletHolder = servletHandler.addServletWithMapping(WSServlet.class, 
"/test");
+        servletHolder = 
servletHandler.addServletWithMapping(ConnectionCheckServlet.class, "/check");
+
+        server.start();
+
+        logger.info("Starting server on port {} for HTTP, and {} for HTTPS", 
httpConnector.getLocalPort(), sslConnector.getLocalPort());
+
+
+    }
+
+    @AfterClass
+    public static void teardown() throws Exception {
+        logger.info("Stopping server.");
+        try {
+            server.stop();
+        } catch (Exception e) {
+            logger.error("Failed to stop Jetty server due to " + e, e);
+        }
+    }
+
+    @Test
+    public void test() throws Exception {
+        logger.info("Waiting for a while...");
+        Thread.sleep(1000_000);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/resources/certs/localhost-ks.jks
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/resources/certs/localhost-ks.jks
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/resources/certs/localhost-ks.jks
new file mode 100755
index 0000000..119b50f
Binary files /dev/null and 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/resources/certs/localhost-ks.jks
 differ

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/resources/certs/localhost-ts.jks
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/resources/certs/localhost-ts.jks
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/resources/certs/localhost-ts.jks
new file mode 100755
index 0000000..7824378
Binary files /dev/null and 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/resources/certs/localhost-ts.jks
 differ

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/resources/certs/localhost.crt
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/resources/certs/localhost.crt
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/resources/certs/localhost.crt
new file mode 100644
index 0000000..6c50a80
Binary files /dev/null and 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/resources/certs/localhost.crt
 differ

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-websocket-bundle/pom.xml 
b/nifi-nar-bundles/nifi-websocket-bundle/pom.xml
new file mode 100644
index 0000000..f2ed951
--- /dev/null
+++ b/nifi-nar-bundles/nifi-websocket-bundle/pom.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <version>1.1.0-SNAPSHOT</version>
+    </parent>
+
+    <groupId>org.apache.nifi</groupId>
+    <artifactId>nifi-websocket-bundle</artifactId>
+    <version>1.1.0-SNAPSHOT</version>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>nifi-websocket-processors</module>
+        <module>nifi-websocket-processors-nar</module>
+        <module>nifi-websocket-services-api</module>
+        <module>nifi-websocket-services-api-nar</module>
+        <module>nifi-websocket-services-jetty</module>
+        <module>nifi-websocket-services-jetty-nar</module>
+    </modules>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 476ea23..2ead087 100755
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -71,6 +71,7 @@
         <module>nifi-ignite-bundle</module>
         <module>nifi-email-bundle</module>
        <module>nifi-ranger-bundle</module>
+        <module>nifi-websocket-bundle</module>
   </modules>
     
   <dependencyManagement>

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 957390c..f6eed44 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1253,6 +1253,24 @@ language governing permissions and limitations under the 
License. -->
             </dependency>
             <dependency>
                 <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-websocket-services-api-nar</artifactId>
+                <version>1.1.0-SNAPSHOT</version>
+                <type>nar</type>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-websocket-services-jetty-nar</artifactId>
+                <version>1.1.0-SNAPSHOT</version>
+                <type>nar</type>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-websocket-processors-nar</artifactId>
+                <version>1.1.0-SNAPSHOT</version>
+                <type>nar</type>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-datadog-nar</artifactId>
                 <version>1.1.0-SNAPSHOT</version>
                 <type>nar</type>
@@ -1320,6 +1338,16 @@ language governing permissions and limitations under the 
License. -->
             </dependency>
             <dependency>
                 <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-websocket-services-api</artifactId>
+                <version>1.1.0-SNAPSHOT</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-websocket-services-jetty</artifactId>
+                <version>1.1.0-SNAPSHOT</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-assembly</artifactId>
                 <version>1.1.0-SNAPSHOT</version>
             </dependency>

Reply via email to