http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/pom.xml 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/pom.xml
new file mode 100644
index 0000000..bf9951b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/pom.xml
@@ -0,0 +1,41 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <!-- Licensed to the Apache Software Foundation (ASF) under one or more 
+               contributor license agreements. See the NOTICE file distributed 
with this 
+               work for additional information regarding copyright ownership. 
The ASF licenses 
+               this file to You under the Apache License, Version 2.0 (the 
"License"); you 
+               may not use this file except in compliance with the License. 
You may obtain 
+               a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0 Unless 
+               required by applicable law or agreed to in writing, software 
distributed 
+               under the License is distributed on an "AS IS" BASIS, WITHOUT 
WARRANTIES 
+               OR CONDITIONS OF ANY KIND, either express or implied. See the 
License for 
+               the specific language governing permissions and limitations 
under the License. -->
+       <parent>
+               <groupId>org.apache.nifi</groupId>
+               <artifactId>nifi-websocket-bundle</artifactId>
+               <version>1.1.0-SNAPSHOT</version>
+       </parent>
+       <modelVersion>4.0.0</modelVersion>
+       <artifactId>nifi-websocket-services-api</artifactId>
+       <packaging>jar</packaging>
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-ssl-context-service-api</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-processor-utils</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>log4j</groupId>
+                       <artifactId>log4j</artifactId>
+                       <version>1.2.17</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-mock</artifactId>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketService.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketService.java
new file mode 100644
index 0000000..fac1e42
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketService.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.processor.Processor;
+
+import java.io.IOException;
+
+public abstract class AbstractWebSocketService extends 
AbstractControllerService implements WebSocketService {
+
+    final protected WebSocketMessageRouters routers = new 
WebSocketMessageRouters();
+
+    @Override
+    public void registerProcessor(final String endpointId, final Processor 
processor) throws WebSocketConfigurationException {
+        routers.registerProcessor(endpointId, processor);
+    }
+
+    @Override
+    public boolean isProcessorRegistered(final String endpointId, final 
Processor processor) {
+        return routers.isProcessorRegistered(endpointId, processor);
+    }
+
+    @Override
+    public void deregisterProcessor(final String endpointId, final Processor 
processor) throws WebSocketConfigurationException {
+        routers.deregisterProcessor(endpointId, processor);
+    }
+
+    @Override
+    public void sendMessage(final String endpointId, final String sessionId, 
final SendMessage sendMessage) throws IOException, 
WebSocketConfigurationException {
+        routers.sendMessage(endpointId, sessionId, sendMessage);
+    }
+
+    @Override
+    public void disconnect(final String endpointId, final String sessionId, 
final String reason) throws IOException, WebSocketConfigurationException {
+        routers.disconnect(endpointId, sessionId, reason);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketSession.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketSession.java
new file mode 100644
index 0000000..9b41641
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketSession.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket;
+
+public abstract class AbstractWebSocketSession implements WebSocketSession {
+
+    private String transitUri;
+
+    @Override
+    public void setTransitUri(final String transitUri) {
+        this.transitUri = transitUri;
+    }
+
+    @Override
+    public String getTransitUri() {
+        return transitUri;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/BinaryMessageConsumer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/BinaryMessageConsumer.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/BinaryMessageConsumer.java
new file mode 100644
index 0000000..be2f0ca
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/BinaryMessageConsumer.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket;
+
+public interface BinaryMessageConsumer {
+    void consume(final WebSocketSessionInfo sessionInfo, final byte[] payload, 
final int offset, final int length);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/ConnectedListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/ConnectedListener.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/ConnectedListener.java
new file mode 100644
index 0000000..b682094
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/ConnectedListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket;
+
+/**
+ * To be performed when a WebSocket connection is established.
+ */
+public interface ConnectedListener {
+    void connected(final WebSocketSessionInfo sessionInfo);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/MessageSender.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/MessageSender.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/MessageSender.java
new file mode 100644
index 0000000..61d7061
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/MessageSender.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public interface MessageSender extends WebSocketSessionInfo {
+    void sendString(final String message) throws IOException;
+    void sendBinary(final ByteBuffer data) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/SendMessage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/SendMessage.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/SendMessage.java
new file mode 100644
index 0000000..208ed6e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/SendMessage.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket;
+
+import java.io.IOException;
+
+public interface SendMessage {
+    void send(final MessageSender sender) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/TextMessageConsumer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/TextMessageConsumer.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/TextMessageConsumer.java
new file mode 100644
index 0000000..c1fed80
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/TextMessageConsumer.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket;
+
+public interface TextMessageConsumer {
+    void consume(final WebSocketSessionInfo sessionInfo, final String message);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketClientService.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketClientService.java
new file mode 100644
index 0000000..9d4ef16
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketClientService.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.controller.ConfigurationContext;
+
+import java.io.IOException;
+
+/**
+ * Control a WebSocket client instance.
+ */
+@CapabilityDescription("Control a WebSocket client instance," +
+        " so that NiFi can connect with external systems via WebSocket 
protocol.")
+public interface WebSocketClientService extends WebSocketService {
+
+    void startClient(final ConfigurationContext context) throws Exception;
+
+    void stopClient() throws Exception;
+
+    void connect(final String clientId) throws IOException;
+
+    String getTargetUri();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketConfigurationException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketConfigurationException.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketConfigurationException.java
new file mode 100644
index 0000000..f78c905
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketConfigurationException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket;
+
+public class WebSocketConfigurationException extends Exception {
+
+    public WebSocketConfigurationException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketConnectedMessage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketConnectedMessage.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketConnectedMessage.java
new file mode 100644
index 0000000..cb2e2eb
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketConnectedMessage.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket;
+
+public class WebSocketConnectedMessage extends WebSocketMessage {
+    public WebSocketConnectedMessage(final WebSocketSessionInfo sessionInfo) {
+        super(sessionInfo);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessage.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessage.java
new file mode 100644
index 0000000..98f8dfb
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessage.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket;
+
+import java.io.UnsupportedEncodingException;
+
+public class WebSocketMessage {
+    public static final String CHARSET_NAME = "UTF-8";
+
+    public enum Type {
+        TEXT,
+        BINARY
+    }
+
+    private final WebSocketSessionInfo sessionInfo;
+    private byte[] payload;
+    private int offset;
+    private int length;
+    private Type type;
+
+    public WebSocketMessage(final WebSocketSessionInfo sessionInfo) {
+        this.sessionInfo = sessionInfo;
+    }
+
+    public WebSocketSessionInfo getSessionInfo() {
+        return sessionInfo;
+    }
+
+    public byte[] getPayload() {
+        return payload;
+    }
+
+    public void setPayload(final String text) {
+        if (text == null) {
+            return;
+        }
+
+        try {
+            final byte[] bytes = text.getBytes(CHARSET_NAME);
+            setPayload(bytes, 0, bytes.length);
+            type = Type.TEXT;
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException("Failed to serialize messageStr, due to 
" + e, e);
+        }
+    }
+
+    public void setPayload(final byte[] payload, final int offset, final int 
length) {
+        this.payload = payload;
+        this.offset = offset;
+        this.length = length;
+        type = Type.BINARY;
+    }
+
+    public int getOffset() {
+        return offset;
+    }
+
+    public int getLength() {
+        return length;
+    }
+
+    public Type getType() {
+        return type;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
new file mode 100644
index 0000000..057b33d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket;
+
+import org.apache.nifi.processor.Processor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class WebSocketMessageRouter {
+    private static final Logger logger = 
LoggerFactory.getLogger(WebSocketMessageRouter.class);
+    private final String endpointId;
+    private final Map<String, WebSocketSession> sessions = new 
ConcurrentHashMap<>();
+    private volatile Processor processor;
+
+    public WebSocketMessageRouter(final String endpointId) {
+        this.endpointId = endpointId;
+    }
+
+    public synchronized void registerProcessor(final Processor processor) 
throws WebSocketConfigurationException {
+        if (this.processor != null) {
+            throw new WebSocketConfigurationException("Processor " + 
this.processor + " is already assigned to this router.");
+        }
+        this.processor = processor;
+    }
+
+    public boolean isProcessorRegistered(final Processor processor) {
+        return this.processor != null && 
this.processor.getIdentifier().equals(processor.getIdentifier());
+    }
+
+    public synchronized void deregisterProcessor(final Processor processor) {
+        if (!isProcessorRegistered(processor)) {
+            if (this.processor == null) {
+                logger.info("Deregister processor {}, do nothing because this 
router doesn't have registered processor",
+                        new Object[]{processor});
+            } else {
+                logger.info("Deregister processor {}, do nothing because this 
router is assigned to different processor {}",
+                        new Object[]{processor, this.processor});
+            }
+            return;
+        }
+
+        this.processor = null;
+        sessions.keySet().forEach(sessionId -> {
+            try {
+                disconnect(sessionId, "Processing has stopped.");
+            } catch (IOException e) {
+                logger.warn("Failed to disconnect session {} due to {}", 
sessionId, e, e);
+            }
+        });
+    }
+
+    public void captureSession(final WebSocketSession session) {
+        final String sessionId = session.getSessionId();
+        sessions.put(sessionId, session);
+
+        if (processor != null && processor instanceof ConnectedListener) {
+            ((ConnectedListener)processor).connected(session);
+        }
+    }
+
+    public void onWebSocketClose(final String sessionId, final int statusCode, 
final String reason) {
+        sessions.remove(sessionId);
+    }
+
+    public void onWebSocketText(final String sessionId, final String message) {
+        if (processor != null && processor instanceof TextMessageConsumer) {
+            
((TextMessageConsumer)processor).consume(getSessionOrFail(sessionId), message);
+        }
+    }
+
+    public void onWebSocketBinary(final String sessionId, final byte[] 
payload, final int offset, final int length) {
+        if (processor != null && processor instanceof BinaryMessageConsumer) {
+            
((BinaryMessageConsumer)processor).consume(getSessionOrFail(sessionId), 
payload, offset, length);
+        }
+    }
+
+    private WebSocketSession getSessionOrFail(final String sessionId) {
+        final WebSocketSession session = sessions.get(sessionId);
+        if (session == null) {
+            throw new IllegalStateException("Session was not found for the 
sessionId: " + sessionId);
+        }
+        return session;
+    }
+
+    public void sendMessage(final String sessionId, final SendMessage 
sendMessage) throws IOException {
+        final WebSocketSession session = getSessionOrFail(sessionId);
+        sendMessage.send(session);
+    }
+
+    public void disconnect(final String sessionId, final String reason) throws 
IOException {
+        final WebSocketSession session = getSessionOrFail(sessionId);
+        session.close(reason);
+        sessions.remove(sessionId);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouters.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouters.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouters.java
new file mode 100644
index 0000000..2551eb4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouters.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket;
+
+import org.apache.nifi.processor.Processor;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class WebSocketMessageRouters {
+    private Map<String, WebSocketMessageRouter> routers = new 
ConcurrentHashMap<>();
+
+    private synchronized WebSocketMessageRouter getRouterOrCreate(final String 
endpointId) {
+        WebSocketMessageRouter router = routers.get(endpointId);
+        if (router == null) {
+            router = new WebSocketMessageRouter(endpointId);
+            routers.put(endpointId, router);
+        }
+        return router;
+    }
+
+    public WebSocketMessageRouter getRouterOrFail(final String endpointId) 
throws WebSocketConfigurationException {
+        final WebSocketMessageRouter router = routers.get(endpointId);
+
+        if (router == null) {
+            throw new WebSocketConfigurationException("No WebSocket router is 
bound with endpointId: " + endpointId);
+        }
+        return router;
+    }
+
+    public synchronized void registerProcessor(final String endpointId, final 
Processor processor) throws WebSocketConfigurationException {
+        final WebSocketMessageRouter router = getRouterOrCreate(endpointId);
+        router.registerProcessor(processor);
+    }
+
+    public boolean isProcessorRegistered(final String endpointId, final 
Processor processor) {
+        try {
+            final WebSocketMessageRouter router = getRouterOrFail(endpointId);
+            return router.isProcessorRegistered(processor);
+        } catch (WebSocketConfigurationException e) {
+            return false;
+        }
+    }
+
+    public synchronized void deregisterProcessor(final String endpointId, 
final Processor processor) throws WebSocketConfigurationException {
+        final WebSocketMessageRouter router = getRouterOrFail(endpointId);
+        router.deregisterProcessor(processor);
+    }
+
+    public void sendMessage(final String endpointId, final String sessionId, 
final SendMessage sendMessage) throws IOException, 
WebSocketConfigurationException {
+        final WebSocketMessageRouter router = getRouterOrFail(endpointId);
+        router.sendMessage(sessionId, sendMessage);
+    }
+
+    public void disconnect(final String endpointId, final String sessionId, 
final String reason) throws IOException, WebSocketConfigurationException {
+        final WebSocketMessageRouter router = getRouterOrFail(endpointId);
+        router.disconnect(sessionId, reason);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketServerService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketServerService.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketServerService.java
new file mode 100644
index 0000000..ed5ad05
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketServerService.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.controller.ConfigurationContext;
+
+/**
+ * Control an embedded WebSocket server instance.
+ */
+@CapabilityDescription("Control an embedded WebSocket server instance," +
+        " so that external system can connect this NiFi via WebSocket 
protocol.")
+public interface WebSocketServerService extends WebSocketService {
+
+    @OnEnabled
+    void startServer(final ConfigurationContext context) throws Exception;
+
+    @OnDisabled
+    @OnShutdown
+    void stopServer() throws Exception;
+
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketService.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketService.java
new file mode 100644
index 0000000..0de80bc
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketService.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.io.IOException;
+
+/**
+ * Control an embedded WebSocket service instance.
+ */
+public interface WebSocketService extends ControllerService {
+
+    PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder()
+            .name("ssl-context-service")
+            .displayName("SSL Context Service")
+            .description("The SSL Context Service to use in order to secure 
the server. If specified, the server will accept only WSS requests; "
+                    + "otherwise, the server will accept only WS requests")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
+
+    void registerProcessor(final String endpointId, final Processor processor) 
throws WebSocketConfigurationException;
+
+    boolean isProcessorRegistered(final String endpointId, final Processor 
processor);
+
+    void deregisterProcessor(final String endpointId, final Processor 
processor) throws WebSocketConfigurationException;
+
+    void sendMessage(final String endpointId, final String sessionId, final 
SendMessage sendMessage) throws IOException, WebSocketConfigurationException;
+
+    void disconnect(final String endpointId, final String sessionId, final 
String reason) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketSession.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketSession.java
new file mode 100644
index 0000000..2e1c937
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketSession.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket;
+
+import java.io.IOException;
+
+/**
+ * This is the concrete WebSocket session interface, which provides session 
information and operations.
+ */
+public interface WebSocketSession extends MessageSender {
+    void close(final String reason) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketSessionInfo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketSessionInfo.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketSessionInfo.java
new file mode 100644
index 0000000..91d4a93
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketSessionInfo.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket;
+
+import java.net.InetSocketAddress;
+
+/**
+ * This interface only expose session information.
+ */
+public interface WebSocketSessionInfo {
+    String getSessionId();
+    InetSocketAddress getLocalAddress();
+    InetSocketAddress getRemoteAddress();
+    boolean isSecure();
+    void setTransitUri(final String transitUri);
+    String getTransitUri();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/test/java/org/apache/nifi/websocket/TestWebSocketMessageRouter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/test/java/org/apache/nifi/websocket/TestWebSocketMessageRouter.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/test/java/org/apache/nifi/websocket/TestWebSocketMessageRouter.java
new file mode 100644
index 0000000..208fe5b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/test/java/org/apache/nifi/websocket/TestWebSocketMessageRouter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket;
+
+import org.apache.nifi.processor.Processor;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestWebSocketMessageRouter {
+
+    @Test
+    public void testRegisterProcessor() throws Exception {
+        final WebSocketMessageRouter router = new 
WebSocketMessageRouter("endpoint-id");
+
+        final Processor processor1 = mock(Processor.class);
+        when(processor1.getIdentifier()).thenReturn("processor-1");
+
+        final Processor processor2 = mock(Processor.class);
+        when(processor1.getIdentifier()).thenReturn("processor-2");
+
+        router.registerProcessor(processor1);
+        try {
+            router.registerProcessor(processor2);
+            fail("Should fail since a processor is already registered.");
+        } catch (WebSocketConfigurationException e) {
+        }
+
+        assertTrue(router.isProcessorRegistered(processor1));
+        assertFalse(router.isProcessorRegistered(processor2));
+
+        // It's safe to call deregister even if it's not registered.
+        router.deregisterProcessor(processor2);
+        router.deregisterProcessor(processor1);
+        // It's safe to call deregister even if it's not registered.
+        router.deregisterProcessor(processor2);
+
+    }
+
+    @Test
+    public void testSendMessage() throws Exception {
+        final WebSocketMessageRouter router = new 
WebSocketMessageRouter("endpoint-id");
+
+        final Processor processor1 = mock(Processor.class);
+        when(processor1.getIdentifier()).thenReturn("processor-1");
+
+        final AbstractWebSocketSession session = 
mock(AbstractWebSocketSession.class);
+        when(session.getSessionId()).thenReturn("session-1");
+        doAnswer(invocation -> {
+            assertEquals("message", invocation.getArgumentAt(0, String.class));
+            return null;
+        }).when(session).sendString(anyString());
+
+        router.registerProcessor(processor1);
+        router.captureSession(session);
+
+        router.sendMessage("session-1", sender -> 
sender.sendString("message"));
+        try {
+            router.sendMessage("session-2", sender -> 
sender.sendString("message"));
+            fail("Should fail because there's no session with id session-2.");
+        } catch (IllegalStateException e) {
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/test/java/org/apache/nifi/websocket/TestWebSocketMessageRouters.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/test/java/org/apache/nifi/websocket/TestWebSocketMessageRouters.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/test/java/org/apache/nifi/websocket/TestWebSocketMessageRouters.java
new file mode 100644
index 0000000..eb911c7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/test/java/org/apache/nifi/websocket/TestWebSocketMessageRouters.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket;
+
+import org.apache.nifi.processor.Processor;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestWebSocketMessageRouters {
+
+    @Test
+    public void testRegisterProcessor() throws Exception {
+        final String endpointId = "endpoint-id";
+        final WebSocketMessageRouters routers = new WebSocketMessageRouters();
+        try {
+            routers.getRouterOrFail(endpointId);
+            fail("Should fail because no route exists with the endpointId.");
+        } catch (WebSocketConfigurationException e) {
+        }
+
+        final Processor processor1 = mock(Processor.class);
+        when(processor1.getIdentifier()).thenReturn("processor-1");
+
+        assertFalse(routers.isProcessorRegistered(endpointId, processor1));
+
+        routers.registerProcessor(endpointId, processor1);
+        assertNotNull(routers.getRouterOrFail(endpointId));
+
+        assertTrue(routers.isProcessorRegistered(endpointId, processor1));
+
+        routers.deregisterProcessor(endpointId, processor1);
+
+        assertFalse(routers.isProcessorRegistered(endpointId, processor1));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty-nar/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty-nar/pom.xml
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty-nar/pom.xml
new file mode 100644
index 0000000..f4cab92
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty-nar/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-websocket-bundle</artifactId>
+        <version>1.1.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-websocket-services-jetty-nar</artifactId>
+    <version>1.1.0-SNAPSHOT</version>
+    <packaging>nar</packaging>
+    <properties>
+        <maven.javadoc.skip>true</maven.javadoc.skip>
+        <source.skip>true</source.skip>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-websocket-services-api-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-websocket-services-jetty</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/pom.xml 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/pom.xml
new file mode 100644
index 0000000..ae2054e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/pom.xml
@@ -0,0 +1,55 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <!-- Licensed to the Apache Software Foundation (ASF) under one or more 
+               contributor license agreements. See the NOTICE file distributed 
with this 
+               work for additional information regarding copyright ownership. 
The ASF licenses 
+               this file to You under the Apache License, Version 2.0 (the 
"License"); you 
+               may not use this file except in compliance with the License. 
You may obtain 
+               a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0 Unless 
+               required by applicable law or agreed to in writing, software 
distributed 
+               under the License is distributed on an "AS IS" BASIS, WITHOUT 
WARRANTIES 
+               OR CONDITIONS OF ANY KIND, either express or implied. See the 
License for 
+               the specific language governing permissions and limitations 
under the License. -->
+       <parent>
+               <groupId>org.apache.nifi</groupId>
+               <artifactId>nifi-websocket-bundle</artifactId>
+               <version>1.1.0-SNAPSHOT</version>
+       </parent>
+       <modelVersion>4.0.0</modelVersion>
+       <artifactId>nifi-websocket-services-jetty</artifactId>
+       <packaging>jar</packaging>
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-nar-utils</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-ssl-context-service-api</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-websocket-services-api</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.eclipse.jetty.websocket</groupId>
+                       <artifactId>websocket-server</artifactId>
+                       <version>9.3.13.v20161014</version>
+               </dependency>
+               <dependency>
+                       <groupId>log4j</groupId>
+                       <artifactId>log4j</artifactId>
+                       <version>1.2.17</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-mock</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-ssl-context-service</artifactId>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/AbstractJettyWebSocketService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/AbstractJettyWebSocketService.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/AbstractJettyWebSocketService.java
new file mode 100644
index 0000000..39d67d5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/AbstractJettyWebSocketService.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket.jetty;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.websocket.AbstractWebSocketService;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.websocket.api.WebSocketPolicy;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class AbstractJettyWebSocketService extends 
AbstractWebSocketService {
+
+    public static final PropertyDescriptor INPUT_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
+            .name("input-buffer-size")
+            .displayName("Input Buffer Size")
+            .description("The size of the input (read from network layer) 
buffer size.")
+            .required(true)
+            .defaultValue("4 kb")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MAX_TEXT_MESSAGE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("max-text-message-size")
+            .displayName("Max Text Message Size")
+            .description("The maximum size of a text message during 
parsing/generating.")
+            .required(true)
+            .defaultValue("64 kb")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MAX_BINARY_MESSAGE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("max-binary-message-size")
+            .displayName("Max Binary Message Size")
+            .description("The maximum size of a binary message during 
parsing/generating.")
+            .required(true)
+            .defaultValue("64 kb")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .build();
+
+    static List<PropertyDescriptor> getAbstractPropertyDescriptors() {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(INPUT_BUFFER_SIZE);
+        descriptors.add(MAX_TEXT_MESSAGE_SIZE);
+        descriptors.add(MAX_BINARY_MESSAGE_SIZE);
+        return descriptors;
+    }
+
+
+    protected SslContextFactory createSslFactory(final SSLContextService 
sslService, final boolean needClientAuth, final boolean wantClientAuth) {
+        final SslContextFactory sslFactory = new SslContextFactory();
+
+        sslFactory.setNeedClientAuth(needClientAuth);
+        sslFactory.setWantClientAuth(wantClientAuth);
+
+        if (sslService.isKeyStoreConfigured()) {
+            sslFactory.setKeyStorePath(sslService.getKeyStoreFile());
+            sslFactory.setKeyStorePassword(sslService.getKeyStorePassword());
+            sslFactory.setKeyStoreType(sslService.getKeyStoreType());
+        }
+
+        if (sslService.isTrustStoreConfigured()) {
+            sslFactory.setTrustStorePath(sslService.getTrustStoreFile());
+            
sslFactory.setTrustStorePassword(sslService.getTrustStorePassword());
+            sslFactory.setTrustStoreType(sslService.getTrustStoreType());
+        }
+
+        return sslFactory;
+    }
+
+    protected void configurePolicy(final ConfigurationContext context, final 
WebSocketPolicy policy) {
+        final int inputBufferSize = 
context.getProperty(INPUT_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        final int maxTextMessageSize = 
context.getProperty(MAX_TEXT_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue();
+        final int maxBinaryMessageSize = 
context.getProperty(MAX_BINARY_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue();
+        policy.setInputBufferSize(inputBufferSize);
+        policy.setMaxTextMessageSize(maxTextMessageSize);
+        policy.setMaxBinaryMessageSize(maxBinaryMessageSize);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
new file mode 100644
index 0000000..af8dd8e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket.jetty;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.websocket.WebSocketClientService;
+import org.apache.nifi.websocket.WebSocketConfigurationException;
+import org.apache.nifi.websocket.WebSocketMessageRouter;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+@Tags({"WebSocket", "Jetty", "client"})
+@CapabilityDescription("Implementation of WebSocketClientService." +
+        " This service uses Jetty WebSocket client module to provide" +
+        " WebSocket session management throughout the application.")
+public class JettyWebSocketClient extends AbstractJettyWebSocketService 
implements WebSocketClientService {
+
+    public static final PropertyDescriptor WS_URI = new 
PropertyDescriptor.Builder()
+            .name("websocket-uri")
+            .displayName("WebSocket URI")
+            .description("The WebSocket URI this client connects to.")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.URI_VALIDATOR)
+            .addValidator((subject, input, context) -> {
+                final ValidationResult.Builder result = new 
ValidationResult.Builder()
+                        .valid(input.startsWith("/"))
+                        .subject(subject);
+
+                if (context.isExpressionLanguageSupported(subject) && 
context.isExpressionLanguagePresent(input)) {
+                    result.explanation("Expression Language 
Present").valid(true);
+                } else {
+                    result.explanation("Protocol should be either 'ws' or 
'wss'.")
+                        .valid(input.startsWith("ws://") || 
input.startsWith("wss://"));
+                }
+
+                return result.build();
+            })
+            .build();
+
+    public static final PropertyDescriptor CONNECTION_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("connection-timeout")
+            .displayName("Connection Timeout")
+            .description("The timeout to connect the WebSocket URI.")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("3 sec")
+            .build();
+
+    private static final List<PropertyDescriptor> properties;
+
+    static {
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.addAll(getAbstractPropertyDescriptors());
+        props.add(WS_URI);
+        props.add(SSL_CONTEXT);
+        props.add(CONNECTION_TIMEOUT);
+
+        properties = Collections.unmodifiableList(props);
+    }
+
+    private WebSocketClient client;
+    private URI webSocketUri;
+    private long connectionTimeoutMillis;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @OnEnabled
+    @Override
+    public void startClient(final ConfigurationContext context) throws 
Exception{
+
+        final SSLContextService sslService = 
context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
+        SslContextFactory sslContextFactory = null;
+        if (sslService != null) {
+            sslContextFactory = createSslFactory(sslService, false, false);
+        }
+        client = new WebSocketClient(sslContextFactory);
+
+        configurePolicy(context, client.getPolicy());
+
+        client.start();
+
+        webSocketUri = new URI(context.getProperty(WS_URI).getValue());
+        connectionTimeoutMillis = 
context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+    }
+
+    @OnDisabled
+    @OnShutdown
+    @Override
+    public void stopClient() throws Exception {
+        if (client == null) {
+            return;
+        }
+
+        client.stop();
+        client = null;
+    }
+
+    @Override
+    public void connect(final String clientId) throws IOException {
+
+        final WebSocketMessageRouter router;
+        try {
+            router = routers.getRouterOrFail(clientId);
+        } catch (WebSocketConfigurationException e) {
+            throw new IllegalStateException("Failed to get router due to: "  + 
e, e);
+        }
+        final RoutingWebSocketListener listener = new 
RoutingWebSocketListener(router);
+
+        final ClientUpgradeRequest request = new ClientUpgradeRequest();
+        final Future<Session> connect = client.connect(listener, webSocketUri, 
request);
+        getLogger().info("Connecting to : {}", new Object[]{webSocketUri});
+
+        final Session session;
+        try {
+            session = connect.get(connectionTimeoutMillis, 
TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            throw new IOException("Failed to connect " + webSocketUri + " due 
to: " + e, e);
+        }
+        getLogger().info("Connected, session={}", new Object[]{session});
+
+    }
+
+    @Override
+    public String getTargetUri() {
+        return webSocketUri.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketServer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketServer.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketServer.java
new file mode 100644
index 0000000..267e7d1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketServer.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket.jetty;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.websocket.WebSocketConfigurationException;
+import org.apache.nifi.websocket.WebSocketMessageRouter;
+import org.apache.nifi.websocket.WebSocketServerService;
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.SecureRequestCustomizer;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.SslConnectionFactory;
+import org.eclipse.jetty.server.handler.ContextHandlerCollection;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHandler;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketPolicy;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
+import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Tags({"WebSocket", "Jetty", "server"})
+@CapabilityDescription("Implementation of WebSocketServerService." +
+        " This service uses Jetty WebSocket server module to provide" +
+        " WebSocket session management throughout the application.")
+public class JettyWebSocketServer extends AbstractJettyWebSocketService 
implements WebSocketServerService {
+
+    /**
+     * A global map to refer a controller service instance by requested port 
number.
+     */
+    private static final Map<Integer, JettyWebSocketServer> 
portToControllerService = new ConcurrentHashMap<>();
+
+    // Allowable values for client auth
+    public static final AllowableValue CLIENT_NONE = new AllowableValue("no", 
"No Authentication",
+            "Processor will not authenticate clients. Anyone can communicate 
with this Processor anonymously");
+    public static final AllowableValue CLIENT_WANT = new 
AllowableValue("want", "Want Authentication",
+            "Processor will try to verify the client but if unable to verify 
will allow the client to communicate anonymously");
+    public static final AllowableValue CLIENT_NEED = new 
AllowableValue("need", "Need Authentication",
+            "Processor will reject communications from any client unless the 
client provides a certificate that is trusted by the TrustStore"
+                    + "specified in the SSL Context Service");
+
+    public static final PropertyDescriptor CLIENT_AUTH = new 
PropertyDescriptor.Builder()
+            .name("client-authentication")
+            .displayName("Client Authentication")
+            .description("Specifies whether or not the Processor should 
authenticate clients. This value is ignored if the <SSL Context Service> "
+                    + "Property is not specified or the SSL Context provided 
uses only a KeyStore and not a TrustStore.")
+            .required(true)
+            .allowableValues(CLIENT_NONE, CLIENT_WANT, CLIENT_NEED)
+            .defaultValue(CLIENT_NONE.getValue())
+            .build();
+
+    public static final PropertyDescriptor LISTEN_PORT = new 
PropertyDescriptor.Builder()
+            .name("listen-port")
+            .displayName("Listen Port")
+            .description("The port number on which this WebSocketServer 
listens to.")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+
+    private static final List<PropertyDescriptor> properties;
+
+    static {
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.addAll(getAbstractPropertyDescriptors());
+        props.add(LISTEN_PORT);
+        props.add(SSL_CONTEXT);
+        props.add(CLIENT_AUTH);
+
+        properties = Collections.unmodifiableList(props);
+    }
+
+    private WebSocketPolicy configuredPolicy;
+    private Server server;
+    private Integer listenPort;
+    private ServletHandler servletHandler;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+
+    public static class JettyWebSocketServlet extends WebSocketServlet 
implements WebSocketCreator {
+        @Override
+        public void configure(WebSocketServletFactory webSocketServletFactory) 
{
+            webSocketServletFactory.setCreator(this);
+        }
+
+        @Override
+        public Object createWebSocket(ServletUpgradeRequest 
servletUpgradeRequest, ServletUpgradeResponse servletUpgradeResponse) {
+            final URI requestURI = servletUpgradeRequest.getRequestURI();
+            final int port = requestURI.getPort();
+            final JettyWebSocketServer service = 
portToControllerService.get(port);
+
+            if (service == null) {
+                throw new RuntimeException("No controller service is bound 
with port: " + port);
+            }
+
+            final String path = requestURI.getPath();
+            final WebSocketMessageRouter router;
+            try {
+                router = service.routers.getRouterOrFail(path);
+            } catch (WebSocketConfigurationException e) {
+                throw new IllegalStateException("Failed to get router due to: 
"  + e, e);
+            }
+
+            final RoutingWebSocketListener listener = new 
RoutingWebSocketListener(router) {
+                @Override
+                public void onWebSocketConnect(Session session) {
+                    final WebSocketPolicy currentPolicy = session.getPolicy();
+                    
currentPolicy.setInputBufferSize(service.configuredPolicy.getInputBufferSize());
+                    
currentPolicy.setMaxTextMessageSize(service.configuredPolicy.getMaxTextMessageSize());
+                    
currentPolicy.setMaxBinaryMessageSize(service.configuredPolicy.getMaxBinaryMessageSize());
+                    super.onWebSocketConnect(session);
+                }
+            };
+
+            return listener;
+        }
+    }
+
+    @OnEnabled
+    @Override
+    public void startServer(final ConfigurationContext context) throws 
Exception {
+
+        if (server != null && server.isRunning()) {
+            getLogger().info("A WebSocket server is already running. {}", new 
Object[]{server});
+            return;
+        }
+
+        configuredPolicy = WebSocketPolicy.newServerPolicy();
+        configurePolicy(context, configuredPolicy);
+
+        server = new Server();
+
+        final ContextHandlerCollection handlerCollection = new 
ContextHandlerCollection();
+
+        final ServletContextHandler contextHandler = new 
ServletContextHandler();
+        servletHandler = new ServletHandler();
+        contextHandler.insertHandler(servletHandler);
+
+        handlerCollection.setHandlers(new Handler[]{contextHandler});
+
+        server.setHandler(handlerCollection);
+
+        listenPort = context.getProperty(LISTEN_PORT).asInteger();
+        final SslContextFactory sslContextFactory = createSslFactory(context);
+
+        final ServerConnector serverConnector = 
createConnector(sslContextFactory, listenPort);
+
+        server.setConnectors(new Connector[] {serverConnector});
+
+        servletHandler.addServletWithMapping(JettyWebSocketServlet.class, 
"/*");
+
+        getLogger().info("Starting JettyWebSocketServer on port {}.", new 
Object[]{listenPort});
+        server.start();
+
+        portToControllerService.put(listenPort, this);
+    }
+
+    private ServerConnector createConnector(final SslContextFactory 
sslContextFactory, final Integer listenPort) {
+
+        final ServerConnector serverConnector;
+        if (sslContextFactory == null) {
+            serverConnector = new ServerConnector(server);
+        } else {
+            final HttpConfiguration httpsConfiguration = new 
HttpConfiguration();
+            httpsConfiguration.setSecureScheme("https");
+            httpsConfiguration.addCustomizer(new SecureRequestCustomizer());
+            serverConnector = new ServerConnector(server,
+                    new SslConnectionFactory(sslContextFactory, "http/1.1"),
+                    new HttpConnectionFactory(httpsConfiguration));
+        }
+        serverConnector.setPort(listenPort);
+        return serverConnector;
+    }
+
+    private SslContextFactory createSslFactory(final ConfigurationContext 
context) {
+        final SSLContextService sslService = 
context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
+
+        final String clientAuthValue = 
context.getProperty(CLIENT_AUTH).getValue();
+        final boolean need;
+        final boolean want;
+        if (CLIENT_NEED.equals(clientAuthValue)) {
+            need = true;
+            want = false;
+        } else if (CLIENT_WANT.equals(clientAuthValue)) {
+            need = false;
+            want = true;
+        } else {
+            need = false;
+            want = false;
+        }
+
+        final SslContextFactory sslFactory = (sslService == null) ? null : 
createSslFactory(sslService, need, want);
+        return sslFactory;
+    }
+
+    @OnDisabled
+    @OnShutdown
+    @Override
+    public void stopServer() throws Exception {
+        if (server == null) {
+            return;
+        }
+
+        getLogger().info("Stopping JettyWebSocketServer.");
+        server.stop();
+        if (portToControllerService.containsKey(listenPort)
+                && 
this.getIdentifier().equals(portToControllerService.get(listenPort).getIdentifier()))
 {
+            portToControllerService.remove(listenPort);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketSession.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketSession.java
new file mode 100644
index 0000000..d1da779
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketSession.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket.jetty;
+
+import org.apache.nifi.websocket.AbstractWebSocketSession;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.StatusCode;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+
+public class JettyWebSocketSession extends AbstractWebSocketSession {
+
+    private final String sessionId;
+    private final Session session;
+
+    public JettyWebSocketSession(final String sessionId, final Session 
session) {
+        this.sessionId = sessionId;
+        this.session = session;
+    }
+
+    @Override
+    public String getSessionId() {
+        return sessionId;
+    }
+
+    @Override
+    public void sendString(final String message) throws IOException {
+        session.getRemote().sendString(message);
+    }
+
+    @Override
+    public void sendBinary(final ByteBuffer data) throws IOException {
+        session.getRemote().sendBytes(data);
+    }
+
+    @Override
+    public void close(final String reason) throws IOException {
+        session.close(StatusCode.NORMAL, reason);
+    }
+
+    @Override
+    public InetSocketAddress getRemoteAddress() {
+        return session.getRemoteAddress();
+    }
+
+    @Override
+    public InetSocketAddress getLocalAddress() {
+        return session.getLocalAddress();
+    }
+
+    @Override
+    public boolean isSecure() {
+        return session.isSecure();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/RoutingWebSocketListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/RoutingWebSocketListener.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/RoutingWebSocketListener.java
new file mode 100644
index 0000000..81376a4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/RoutingWebSocketListener.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket.jetty;
+
+import org.apache.nifi.websocket.WebSocketMessageRouter;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+
+import java.util.UUID;
+
+public class RoutingWebSocketListener extends WebSocketAdapter {
+    private final WebSocketMessageRouter router;
+    private String sessionId;
+
+    public RoutingWebSocketListener(final WebSocketMessageRouter router) {
+        this.router = router;
+    }
+
+    @Override
+    public void onWebSocketConnect(final Session session) {
+        super.onWebSocketConnect(session);
+        sessionId = UUID.randomUUID().toString();
+        final JettyWebSocketSession webSocketSession = new 
JettyWebSocketSession(sessionId, session);
+        router.captureSession(webSocketSession);
+    }
+
+    @Override
+    public void onWebSocketClose(final int statusCode, final String reason) {
+        super.onWebSocketClose(statusCode, reason);
+        router.onWebSocketClose(sessionId, statusCode, reason);
+    }
+
+    @Override
+    public void onWebSocketText(final String message) {
+        router.onWebSocketText(sessionId, message);
+    }
+
+    @Override
+    public void onWebSocketBinary(final byte[] payload, final int offset, 
final int len) {
+        router.onWebSocketBinary(sessionId, payload, offset, len);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000..6f3afe4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.websocket.jetty.JettyWebSocketServer
+org.apache.nifi.websocket.jetty.JettyWebSocketClient
\ No newline at end of file

Reply via email to