http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/pom.xml b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/pom.xml new file mode 100644 index 0000000..bf9951b --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/pom.xml @@ -0,0 +1,41 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with this + work for additional information regarding copyright ownership. The ASF licenses + this file to You under the Apache License, Version 2.0 (the "License"); you + may not use this file except in compliance with the License. You may obtain + a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless + required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the License. --> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-websocket-bundle</artifactId> + <version>1.1.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>nifi-websocket-services-api</artifactId> + <packaging>jar</packaging> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-ssl-context-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <version>1.2.17</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project>
http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketService.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketService.java new file mode 100644 index 0000000..fac1e42 --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketService.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.processor.Processor; + +import java.io.IOException; + +public abstract class AbstractWebSocketService extends AbstractControllerService implements WebSocketService { + + final protected WebSocketMessageRouters routers = new WebSocketMessageRouters(); + + @Override + public void registerProcessor(final String endpointId, final Processor processor) throws WebSocketConfigurationException { + routers.registerProcessor(endpointId, processor); + } + + @Override + public boolean isProcessorRegistered(final String endpointId, final Processor processor) { + return routers.isProcessorRegistered(endpointId, processor); + } + + @Override + public void deregisterProcessor(final String endpointId, final Processor processor) throws WebSocketConfigurationException { + routers.deregisterProcessor(endpointId, processor); + } + + @Override + public void sendMessage(final String endpointId, final String sessionId, final SendMessage sendMessage) throws IOException, WebSocketConfigurationException { + routers.sendMessage(endpointId, sessionId, sendMessage); + } + + @Override + public void disconnect(final String endpointId, final String sessionId, final String reason) throws IOException, WebSocketConfigurationException { + routers.disconnect(endpointId, sessionId, reason); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketSession.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketSession.java new file mode 100644 index 0000000..9b41641 --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketSession.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket; + +public abstract class AbstractWebSocketSession implements WebSocketSession { + + private String transitUri; + + @Override + public void setTransitUri(final String transitUri) { + this.transitUri = transitUri; + } + + @Override + public String getTransitUri() { + return transitUri; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/BinaryMessageConsumer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/BinaryMessageConsumer.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/BinaryMessageConsumer.java new file mode 100644 index 0000000..be2f0ca --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/BinaryMessageConsumer.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket; + +public interface BinaryMessageConsumer { + void consume(final WebSocketSessionInfo sessionInfo, final byte[] payload, final int offset, final int length); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/ConnectedListener.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/ConnectedListener.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/ConnectedListener.java new file mode 100644 index 0000000..b682094 --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/ConnectedListener.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket; + +/** + * To be performed when a WebSocket connection is established. + */ +public interface ConnectedListener { + void connected(final WebSocketSessionInfo sessionInfo); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/MessageSender.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/MessageSender.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/MessageSender.java new file mode 100644 index 0000000..61d7061 --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/MessageSender.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public interface MessageSender extends WebSocketSessionInfo { + void sendString(final String message) throws IOException; + void sendBinary(final ByteBuffer data) throws IOException; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/SendMessage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/SendMessage.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/SendMessage.java new file mode 100644 index 0000000..208ed6e --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/SendMessage.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket; + +import java.io.IOException; + +public interface SendMessage { + void send(final MessageSender sender) throws IOException; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/TextMessageConsumer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/TextMessageConsumer.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/TextMessageConsumer.java new file mode 100644 index 0000000..c1fed80 --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/TextMessageConsumer.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket; + +public interface TextMessageConsumer { + void consume(final WebSocketSessionInfo sessionInfo, final String message); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketClientService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketClientService.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketClientService.java new file mode 100644 index 0000000..9d4ef16 --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketClientService.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.controller.ConfigurationContext; + +import java.io.IOException; + +/** + * Control a WebSocket client instance. + */ +@CapabilityDescription("Control a WebSocket client instance," + + " so that NiFi can connect with external systems via WebSocket protocol.") +public interface WebSocketClientService extends WebSocketService { + + void startClient(final ConfigurationContext context) throws Exception; + + void stopClient() throws Exception; + + void connect(final String clientId) throws IOException; + + String getTargetUri(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketConfigurationException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketConfigurationException.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketConfigurationException.java new file mode 100644 index 0000000..f78c905 --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketConfigurationException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket; + +public class WebSocketConfigurationException extends Exception { + + public WebSocketConfigurationException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketConnectedMessage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketConnectedMessage.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketConnectedMessage.java new file mode 100644 index 0000000..cb2e2eb --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketConnectedMessage.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket; + +public class WebSocketConnectedMessage extends WebSocketMessage { + public WebSocketConnectedMessage(final WebSocketSessionInfo sessionInfo) { + super(sessionInfo); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessage.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessage.java new file mode 100644 index 0000000..98f8dfb --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessage.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket; + +import java.io.UnsupportedEncodingException; + +public class WebSocketMessage { + public static final String CHARSET_NAME = "UTF-8"; + + public enum Type { + TEXT, + BINARY + } + + private final WebSocketSessionInfo sessionInfo; + private byte[] payload; + private int offset; + private int length; + private Type type; + + public WebSocketMessage(final WebSocketSessionInfo sessionInfo) { + this.sessionInfo = sessionInfo; + } + + public WebSocketSessionInfo getSessionInfo() { + return sessionInfo; + } + + public byte[] getPayload() { + return payload; + } + + public void setPayload(final String text) { + if (text == null) { + return; + } + + try { + final byte[] bytes = text.getBytes(CHARSET_NAME); + setPayload(bytes, 0, bytes.length); + type = Type.TEXT; + } catch (UnsupportedEncodingException e) { + throw new RuntimeException("Failed to serialize messageStr, due to " + e, e); + } + } + + public void setPayload(final byte[] payload, final int offset, final int length) { + this.payload = payload; + this.offset = offset; + this.length = length; + type = Type.BINARY; + } + + public int getOffset() { + return offset; + } + + public int getLength() { + return length; + } + + public Type getType() { + return type; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java new file mode 100644 index 0000000..057b33d --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket; + +import org.apache.nifi.processor.Processor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class WebSocketMessageRouter { + private static final Logger logger = LoggerFactory.getLogger(WebSocketMessageRouter.class); + private final String endpointId; + private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>(); + private volatile Processor processor; + + public WebSocketMessageRouter(final String endpointId) { + this.endpointId = endpointId; + } + + public synchronized void registerProcessor(final Processor processor) throws WebSocketConfigurationException { + if (this.processor != null) { + throw new WebSocketConfigurationException("Processor " + this.processor + " is already assigned to this router."); + } + this.processor = processor; + } + + public boolean isProcessorRegistered(final Processor processor) { + return this.processor != null && this.processor.getIdentifier().equals(processor.getIdentifier()); + } + + public synchronized void deregisterProcessor(final Processor processor) { + if (!isProcessorRegistered(processor)) { + if (this.processor == null) { + logger.info("Deregister processor {}, do nothing because this router doesn't have registered processor", + new Object[]{processor}); + } else { + logger.info("Deregister processor {}, do nothing because this router is assigned to different processor {}", + new Object[]{processor, this.processor}); + } + return; + } + + this.processor = null; + sessions.keySet().forEach(sessionId -> { + try { + disconnect(sessionId, "Processing has stopped."); + } catch (IOException e) { + logger.warn("Failed to disconnect session {} due to {}", sessionId, e, e); + } + }); + } + + public void captureSession(final WebSocketSession session) { + final String sessionId = session.getSessionId(); + sessions.put(sessionId, session); + + if (processor != null && processor instanceof ConnectedListener) { + ((ConnectedListener)processor).connected(session); + } + } + + public void onWebSocketClose(final String sessionId, final int statusCode, final String reason) { + sessions.remove(sessionId); + } + + public void onWebSocketText(final String sessionId, final String message) { + if (processor != null && processor instanceof TextMessageConsumer) { + ((TextMessageConsumer)processor).consume(getSessionOrFail(sessionId), message); + } + } + + public void onWebSocketBinary(final String sessionId, final byte[] payload, final int offset, final int length) { + if (processor != null && processor instanceof BinaryMessageConsumer) { + ((BinaryMessageConsumer)processor).consume(getSessionOrFail(sessionId), payload, offset, length); + } + } + + private WebSocketSession getSessionOrFail(final String sessionId) { + final WebSocketSession session = sessions.get(sessionId); + if (session == null) { + throw new IllegalStateException("Session was not found for the sessionId: " + sessionId); + } + return session; + } + + public void sendMessage(final String sessionId, final SendMessage sendMessage) throws IOException { + final WebSocketSession session = getSessionOrFail(sessionId); + sendMessage.send(session); + } + + public void disconnect(final String sessionId, final String reason) throws IOException { + final WebSocketSession session = getSessionOrFail(sessionId); + session.close(reason); + sessions.remove(sessionId); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouters.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouters.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouters.java new file mode 100644 index 0000000..2551eb4 --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouters.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket; + +import org.apache.nifi.processor.Processor; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class WebSocketMessageRouters { + private Map<String, WebSocketMessageRouter> routers = new ConcurrentHashMap<>(); + + private synchronized WebSocketMessageRouter getRouterOrCreate(final String endpointId) { + WebSocketMessageRouter router = routers.get(endpointId); + if (router == null) { + router = new WebSocketMessageRouter(endpointId); + routers.put(endpointId, router); + } + return router; + } + + public WebSocketMessageRouter getRouterOrFail(final String endpointId) throws WebSocketConfigurationException { + final WebSocketMessageRouter router = routers.get(endpointId); + + if (router == null) { + throw new WebSocketConfigurationException("No WebSocket router is bound with endpointId: " + endpointId); + } + return router; + } + + public synchronized void registerProcessor(final String endpointId, final Processor processor) throws WebSocketConfigurationException { + final WebSocketMessageRouter router = getRouterOrCreate(endpointId); + router.registerProcessor(processor); + } + + public boolean isProcessorRegistered(final String endpointId, final Processor processor) { + try { + final WebSocketMessageRouter router = getRouterOrFail(endpointId); + return router.isProcessorRegistered(processor); + } catch (WebSocketConfigurationException e) { + return false; + } + } + + public synchronized void deregisterProcessor(final String endpointId, final Processor processor) throws WebSocketConfigurationException { + final WebSocketMessageRouter router = getRouterOrFail(endpointId); + router.deregisterProcessor(processor); + } + + public void sendMessage(final String endpointId, final String sessionId, final SendMessage sendMessage) throws IOException, WebSocketConfigurationException { + final WebSocketMessageRouter router = getRouterOrFail(endpointId); + router.sendMessage(sessionId, sendMessage); + } + + public void disconnect(final String endpointId, final String sessionId, final String reason) throws IOException, WebSocketConfigurationException { + final WebSocketMessageRouter router = getRouterOrFail(endpointId); + router.disconnect(sessionId, reason); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketServerService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketServerService.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketServerService.java new file mode 100644 index 0000000..ed5ad05 --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketServerService.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.controller.ConfigurationContext; + +/** + * Control an embedded WebSocket server instance. + */ +@CapabilityDescription("Control an embedded WebSocket server instance," + + " so that external system can connect this NiFi via WebSocket protocol.") +public interface WebSocketServerService extends WebSocketService { + + @OnEnabled + void startServer(final ConfigurationContext context) throws Exception; + + @OnDisabled + @OnShutdown + void stopServer() throws Exception; + + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketService.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketService.java new file mode 100644 index 0000000..0de80bc --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketService.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.ssl.SSLContextService; + +import java.io.IOException; + +/** + * Control an embedded WebSocket service instance. + */ +public interface WebSocketService extends ControllerService { + + PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder() + .name("ssl-context-service") + .displayName("SSL Context Service") + .description("The SSL Context Service to use in order to secure the server. If specified, the server will accept only WSS requests; " + + "otherwise, the server will accept only WS requests") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + + void registerProcessor(final String endpointId, final Processor processor) throws WebSocketConfigurationException; + + boolean isProcessorRegistered(final String endpointId, final Processor processor); + + void deregisterProcessor(final String endpointId, final Processor processor) throws WebSocketConfigurationException; + + void sendMessage(final String endpointId, final String sessionId, final SendMessage sendMessage) throws IOException, WebSocketConfigurationException; + + void disconnect(final String endpointId, final String sessionId, final String reason) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketSession.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketSession.java new file mode 100644 index 0000000..2e1c937 --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketSession.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket; + +import java.io.IOException; + +/** + * This is the concrete WebSocket session interface, which provides session information and operations. + */ +public interface WebSocketSession extends MessageSender { + void close(final String reason) throws IOException; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketSessionInfo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketSessionInfo.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketSessionInfo.java new file mode 100644 index 0000000..91d4a93 --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketSessionInfo.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket; + +import java.net.InetSocketAddress; + +/** + * This interface only expose session information. + */ +public interface WebSocketSessionInfo { + String getSessionId(); + InetSocketAddress getLocalAddress(); + InetSocketAddress getRemoteAddress(); + boolean isSecure(); + void setTransitUri(final String transitUri); + String getTransitUri(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/test/java/org/apache/nifi/websocket/TestWebSocketMessageRouter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/test/java/org/apache/nifi/websocket/TestWebSocketMessageRouter.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/test/java/org/apache/nifi/websocket/TestWebSocketMessageRouter.java new file mode 100644 index 0000000..208fe5b --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/test/java/org/apache/nifi/websocket/TestWebSocketMessageRouter.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket; + +import org.apache.nifi.processor.Processor; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestWebSocketMessageRouter { + + @Test + public void testRegisterProcessor() throws Exception { + final WebSocketMessageRouter router = new WebSocketMessageRouter("endpoint-id"); + + final Processor processor1 = mock(Processor.class); + when(processor1.getIdentifier()).thenReturn("processor-1"); + + final Processor processor2 = mock(Processor.class); + when(processor1.getIdentifier()).thenReturn("processor-2"); + + router.registerProcessor(processor1); + try { + router.registerProcessor(processor2); + fail("Should fail since a processor is already registered."); + } catch (WebSocketConfigurationException e) { + } + + assertTrue(router.isProcessorRegistered(processor1)); + assertFalse(router.isProcessorRegistered(processor2)); + + // It's safe to call deregister even if it's not registered. + router.deregisterProcessor(processor2); + router.deregisterProcessor(processor1); + // It's safe to call deregister even if it's not registered. + router.deregisterProcessor(processor2); + + } + + @Test + public void testSendMessage() throws Exception { + final WebSocketMessageRouter router = new WebSocketMessageRouter("endpoint-id"); + + final Processor processor1 = mock(Processor.class); + when(processor1.getIdentifier()).thenReturn("processor-1"); + + final AbstractWebSocketSession session = mock(AbstractWebSocketSession.class); + when(session.getSessionId()).thenReturn("session-1"); + doAnswer(invocation -> { + assertEquals("message", invocation.getArgumentAt(0, String.class)); + return null; + }).when(session).sendString(anyString()); + + router.registerProcessor(processor1); + router.captureSession(session); + + router.sendMessage("session-1", sender -> sender.sendString("message")); + try { + router.sendMessage("session-2", sender -> sender.sendString("message")); + fail("Should fail because there's no session with id session-2."); + } catch (IllegalStateException e) { + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/test/java/org/apache/nifi/websocket/TestWebSocketMessageRouters.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/test/java/org/apache/nifi/websocket/TestWebSocketMessageRouters.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/test/java/org/apache/nifi/websocket/TestWebSocketMessageRouters.java new file mode 100644 index 0000000..eb911c7 --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/test/java/org/apache/nifi/websocket/TestWebSocketMessageRouters.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket; + +import org.apache.nifi.processor.Processor; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestWebSocketMessageRouters { + + @Test + public void testRegisterProcessor() throws Exception { + final String endpointId = "endpoint-id"; + final WebSocketMessageRouters routers = new WebSocketMessageRouters(); + try { + routers.getRouterOrFail(endpointId); + fail("Should fail because no route exists with the endpointId."); + } catch (WebSocketConfigurationException e) { + } + + final Processor processor1 = mock(Processor.class); + when(processor1.getIdentifier()).thenReturn("processor-1"); + + assertFalse(routers.isProcessorRegistered(endpointId, processor1)); + + routers.registerProcessor(endpointId, processor1); + assertNotNull(routers.getRouterOrFail(endpointId)); + + assertTrue(routers.isProcessorRegistered(endpointId, processor1)); + + routers.deregisterProcessor(endpointId, processor1); + + assertFalse(routers.isProcessorRegistered(endpointId, processor1)); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty-nar/pom.xml b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty-nar/pom.xml new file mode 100644 index 0000000..f4cab92 --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty-nar/pom.xml @@ -0,0 +1,45 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-websocket-bundle</artifactId> + <version>1.1.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-websocket-services-jetty-nar</artifactId> + <version>1.1.0-SNAPSHOT</version> + <packaging>nar</packaging> + <properties> + <maven.javadoc.skip>true</maven.javadoc.skip> + <source.skip>true</source.skip> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-websocket-services-api-nar</artifactId> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-websocket-services-jetty</artifactId> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/pom.xml b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/pom.xml new file mode 100644 index 0000000..ae2054e --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/pom.xml @@ -0,0 +1,55 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with this + work for additional information regarding copyright ownership. The ASF licenses + this file to You under the Apache License, Version 2.0 (the "License"); you + may not use this file except in compliance with the License. You may obtain + a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless + required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the License. --> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-websocket-bundle</artifactId> + <version>1.1.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>nifi-websocket-services-jetty</artifactId> + <packaging>jar</packaging> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-ssl-context-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-websocket-services-api</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty.websocket</groupId> + <artifactId>websocket-server</artifactId> + <version>9.3.13.v20161014</version> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <version>1.2.17</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-ssl-context-service</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/AbstractJettyWebSocketService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/AbstractJettyWebSocketService.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/AbstractJettyWebSocketService.java new file mode 100644 index 0000000..39d67d5 --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/AbstractJettyWebSocketService.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket.jetty; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.websocket.AbstractWebSocketService; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.websocket.api.WebSocketPolicy; + +import java.util.ArrayList; +import java.util.List; + +public abstract class AbstractJettyWebSocketService extends AbstractWebSocketService { + + public static final PropertyDescriptor INPUT_BUFFER_SIZE = new PropertyDescriptor.Builder() + .name("input-buffer-size") + .displayName("Input Buffer Size") + .description("The size of the input (read from network layer) buffer size.") + .required(true) + .defaultValue("4 kb") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAX_TEXT_MESSAGE_SIZE = new PropertyDescriptor.Builder() + .name("max-text-message-size") + .displayName("Max Text Message Size") + .description("The maximum size of a text message during parsing/generating.") + .required(true) + .defaultValue("64 kb") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAX_BINARY_MESSAGE_SIZE = new PropertyDescriptor.Builder() + .name("max-binary-message-size") + .displayName("Max Binary Message Size") + .description("The maximum size of a binary message during parsing/generating.") + .required(true) + .defaultValue("64 kb") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + + static List<PropertyDescriptor> getAbstractPropertyDescriptors() { + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(INPUT_BUFFER_SIZE); + descriptors.add(MAX_TEXT_MESSAGE_SIZE); + descriptors.add(MAX_BINARY_MESSAGE_SIZE); + return descriptors; + } + + + protected SslContextFactory createSslFactory(final SSLContextService sslService, final boolean needClientAuth, final boolean wantClientAuth) { + final SslContextFactory sslFactory = new SslContextFactory(); + + sslFactory.setNeedClientAuth(needClientAuth); + sslFactory.setWantClientAuth(wantClientAuth); + + if (sslService.isKeyStoreConfigured()) { + sslFactory.setKeyStorePath(sslService.getKeyStoreFile()); + sslFactory.setKeyStorePassword(sslService.getKeyStorePassword()); + sslFactory.setKeyStoreType(sslService.getKeyStoreType()); + } + + if (sslService.isTrustStoreConfigured()) { + sslFactory.setTrustStorePath(sslService.getTrustStoreFile()); + sslFactory.setTrustStorePassword(sslService.getTrustStorePassword()); + sslFactory.setTrustStoreType(sslService.getTrustStoreType()); + } + + return sslFactory; + } + + protected void configurePolicy(final ConfigurationContext context, final WebSocketPolicy policy) { + final int inputBufferSize = context.getProperty(INPUT_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); + final int maxTextMessageSize = context.getProperty(MAX_TEXT_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue(); + final int maxBinaryMessageSize = context.getProperty(MAX_BINARY_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue(); + policy.setInputBufferSize(inputBufferSize); + policy.setMaxTextMessageSize(maxTextMessageSize); + policy.setMaxBinaryMessageSize(maxBinaryMessageSize); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java new file mode 100644 index 0000000..af8dd8e --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket.jetty; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.websocket.WebSocketClientService; +import org.apache.nifi.websocket.WebSocketConfigurationException; +import org.apache.nifi.websocket.WebSocketMessageRouter; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.WebSocketClient; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +@Tags({"WebSocket", "Jetty", "client"}) +@CapabilityDescription("Implementation of WebSocketClientService." + + " This service uses Jetty WebSocket client module to provide" + + " WebSocket session management throughout the application.") +public class JettyWebSocketClient extends AbstractJettyWebSocketService implements WebSocketClientService { + + public static final PropertyDescriptor WS_URI = new PropertyDescriptor.Builder() + .name("websocket-uri") + .displayName("WebSocket URI") + .description("The WebSocket URI this client connects to.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.URI_VALIDATOR) + .addValidator((subject, input, context) -> { + final ValidationResult.Builder result = new ValidationResult.Builder() + .valid(input.startsWith("/")) + .subject(subject); + + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { + result.explanation("Expression Language Present").valid(true); + } else { + result.explanation("Protocol should be either 'ws' or 'wss'.") + .valid(input.startsWith("ws://") || input.startsWith("wss://")); + } + + return result.build(); + }) + .build(); + + public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder() + .name("connection-timeout") + .displayName("Connection Timeout") + .description("The timeout to connect the WebSocket URI.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("3 sec") + .build(); + + private static final List<PropertyDescriptor> properties; + + static { + final List<PropertyDescriptor> props = new ArrayList<>(); + props.addAll(getAbstractPropertyDescriptors()); + props.add(WS_URI); + props.add(SSL_CONTEXT); + props.add(CONNECTION_TIMEOUT); + + properties = Collections.unmodifiableList(props); + } + + private WebSocketClient client; + private URI webSocketUri; + private long connectionTimeoutMillis; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @OnEnabled + @Override + public void startClient(final ConfigurationContext context) throws Exception{ + + final SSLContextService sslService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class); + SslContextFactory sslContextFactory = null; + if (sslService != null) { + sslContextFactory = createSslFactory(sslService, false, false); + } + client = new WebSocketClient(sslContextFactory); + + configurePolicy(context, client.getPolicy()); + + client.start(); + + webSocketUri = new URI(context.getProperty(WS_URI).getValue()); + connectionTimeoutMillis = context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); + } + + @OnDisabled + @OnShutdown + @Override + public void stopClient() throws Exception { + if (client == null) { + return; + } + + client.stop(); + client = null; + } + + @Override + public void connect(final String clientId) throws IOException { + + final WebSocketMessageRouter router; + try { + router = routers.getRouterOrFail(clientId); + } catch (WebSocketConfigurationException e) { + throw new IllegalStateException("Failed to get router due to: " + e, e); + } + final RoutingWebSocketListener listener = new RoutingWebSocketListener(router); + + final ClientUpgradeRequest request = new ClientUpgradeRequest(); + final Future<Session> connect = client.connect(listener, webSocketUri, request); + getLogger().info("Connecting to : {}", new Object[]{webSocketUri}); + + final Session session; + try { + session = connect.get(connectionTimeoutMillis, TimeUnit.MILLISECONDS); + } catch (Exception e) { + throw new IOException("Failed to connect " + webSocketUri + " due to: " + e, e); + } + getLogger().info("Connected, session={}", new Object[]{session}); + + } + + @Override + public String getTargetUri() { + return webSocketUri.toString(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketServer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketServer.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketServer.java new file mode 100644 index 0000000..267e7d1 --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketServer.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket.jetty; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.websocket.WebSocketConfigurationException; +import org.apache.nifi.websocket.WebSocketMessageRouter; +import org.apache.nifi.websocket.WebSocketServerService; +import org.eclipse.jetty.server.Connector; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.HttpConnectionFactory; +import org.eclipse.jetty.server.SecureRequestCustomizer; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.SslConnectionFactory; +import org.eclipse.jetty.server.handler.ContextHandlerCollection; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHandler; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketPolicy; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.eclipse.jetty.websocket.servlet.WebSocketCreator; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Tags({"WebSocket", "Jetty", "server"}) +@CapabilityDescription("Implementation of WebSocketServerService." + + " This service uses Jetty WebSocket server module to provide" + + " WebSocket session management throughout the application.") +public class JettyWebSocketServer extends AbstractJettyWebSocketService implements WebSocketServerService { + + /** + * A global map to refer a controller service instance by requested port number. + */ + private static final Map<Integer, JettyWebSocketServer> portToControllerService = new ConcurrentHashMap<>(); + + // Allowable values for client auth + public static final AllowableValue CLIENT_NONE = new AllowableValue("no", "No Authentication", + "Processor will not authenticate clients. Anyone can communicate with this Processor anonymously"); + public static final AllowableValue CLIENT_WANT = new AllowableValue("want", "Want Authentication", + "Processor will try to verify the client but if unable to verify will allow the client to communicate anonymously"); + public static final AllowableValue CLIENT_NEED = new AllowableValue("need", "Need Authentication", + "Processor will reject communications from any client unless the client provides a certificate that is trusted by the TrustStore" + + "specified in the SSL Context Service"); + + public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() + .name("client-authentication") + .displayName("Client Authentication") + .description("Specifies whether or not the Processor should authenticate clients. This value is ignored if the <SSL Context Service> " + + "Property is not specified or the SSL Context provided uses only a KeyStore and not a TrustStore.") + .required(true) + .allowableValues(CLIENT_NONE, CLIENT_WANT, CLIENT_NEED) + .defaultValue(CLIENT_NONE.getValue()) + .build(); + + public static final PropertyDescriptor LISTEN_PORT = new PropertyDescriptor.Builder() + .name("listen-port") + .displayName("Listen Port") + .description("The port number on which this WebSocketServer listens to.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.PORT_VALIDATOR) + .build(); + + private static final List<PropertyDescriptor> properties; + + static { + final List<PropertyDescriptor> props = new ArrayList<>(); + props.addAll(getAbstractPropertyDescriptors()); + props.add(LISTEN_PORT); + props.add(SSL_CONTEXT); + props.add(CLIENT_AUTH); + + properties = Collections.unmodifiableList(props); + } + + private WebSocketPolicy configuredPolicy; + private Server server; + private Integer listenPort; + private ServletHandler servletHandler; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + + public static class JettyWebSocketServlet extends WebSocketServlet implements WebSocketCreator { + @Override + public void configure(WebSocketServletFactory webSocketServletFactory) { + webSocketServletFactory.setCreator(this); + } + + @Override + public Object createWebSocket(ServletUpgradeRequest servletUpgradeRequest, ServletUpgradeResponse servletUpgradeResponse) { + final URI requestURI = servletUpgradeRequest.getRequestURI(); + final int port = requestURI.getPort(); + final JettyWebSocketServer service = portToControllerService.get(port); + + if (service == null) { + throw new RuntimeException("No controller service is bound with port: " + port); + } + + final String path = requestURI.getPath(); + final WebSocketMessageRouter router; + try { + router = service.routers.getRouterOrFail(path); + } catch (WebSocketConfigurationException e) { + throw new IllegalStateException("Failed to get router due to: " + e, e); + } + + final RoutingWebSocketListener listener = new RoutingWebSocketListener(router) { + @Override + public void onWebSocketConnect(Session session) { + final WebSocketPolicy currentPolicy = session.getPolicy(); + currentPolicy.setInputBufferSize(service.configuredPolicy.getInputBufferSize()); + currentPolicy.setMaxTextMessageSize(service.configuredPolicy.getMaxTextMessageSize()); + currentPolicy.setMaxBinaryMessageSize(service.configuredPolicy.getMaxBinaryMessageSize()); + super.onWebSocketConnect(session); + } + }; + + return listener; + } + } + + @OnEnabled + @Override + public void startServer(final ConfigurationContext context) throws Exception { + + if (server != null && server.isRunning()) { + getLogger().info("A WebSocket server is already running. {}", new Object[]{server}); + return; + } + + configuredPolicy = WebSocketPolicy.newServerPolicy(); + configurePolicy(context, configuredPolicy); + + server = new Server(); + + final ContextHandlerCollection handlerCollection = new ContextHandlerCollection(); + + final ServletContextHandler contextHandler = new ServletContextHandler(); + servletHandler = new ServletHandler(); + contextHandler.insertHandler(servletHandler); + + handlerCollection.setHandlers(new Handler[]{contextHandler}); + + server.setHandler(handlerCollection); + + listenPort = context.getProperty(LISTEN_PORT).asInteger(); + final SslContextFactory sslContextFactory = createSslFactory(context); + + final ServerConnector serverConnector = createConnector(sslContextFactory, listenPort); + + server.setConnectors(new Connector[] {serverConnector}); + + servletHandler.addServletWithMapping(JettyWebSocketServlet.class, "/*"); + + getLogger().info("Starting JettyWebSocketServer on port {}.", new Object[]{listenPort}); + server.start(); + + portToControllerService.put(listenPort, this); + } + + private ServerConnector createConnector(final SslContextFactory sslContextFactory, final Integer listenPort) { + + final ServerConnector serverConnector; + if (sslContextFactory == null) { + serverConnector = new ServerConnector(server); + } else { + final HttpConfiguration httpsConfiguration = new HttpConfiguration(); + httpsConfiguration.setSecureScheme("https"); + httpsConfiguration.addCustomizer(new SecureRequestCustomizer()); + serverConnector = new ServerConnector(server, + new SslConnectionFactory(sslContextFactory, "http/1.1"), + new HttpConnectionFactory(httpsConfiguration)); + } + serverConnector.setPort(listenPort); + return serverConnector; + } + + private SslContextFactory createSslFactory(final ConfigurationContext context) { + final SSLContextService sslService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class); + + final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue(); + final boolean need; + final boolean want; + if (CLIENT_NEED.equals(clientAuthValue)) { + need = true; + want = false; + } else if (CLIENT_WANT.equals(clientAuthValue)) { + need = false; + want = true; + } else { + need = false; + want = false; + } + + final SslContextFactory sslFactory = (sslService == null) ? null : createSslFactory(sslService, need, want); + return sslFactory; + } + + @OnDisabled + @OnShutdown + @Override + public void stopServer() throws Exception { + if (server == null) { + return; + } + + getLogger().info("Stopping JettyWebSocketServer."); + server.stop(); + if (portToControllerService.containsKey(listenPort) + && this.getIdentifier().equals(portToControllerService.get(listenPort).getIdentifier())) { + portToControllerService.remove(listenPort); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketSession.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketSession.java new file mode 100644 index 0000000..d1da779 --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketSession.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket.jetty; + +import org.apache.nifi.websocket.AbstractWebSocketSession; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; + +public class JettyWebSocketSession extends AbstractWebSocketSession { + + private final String sessionId; + private final Session session; + + public JettyWebSocketSession(final String sessionId, final Session session) { + this.sessionId = sessionId; + this.session = session; + } + + @Override + public String getSessionId() { + return sessionId; + } + + @Override + public void sendString(final String message) throws IOException { + session.getRemote().sendString(message); + } + + @Override + public void sendBinary(final ByteBuffer data) throws IOException { + session.getRemote().sendBytes(data); + } + + @Override + public void close(final String reason) throws IOException { + session.close(StatusCode.NORMAL, reason); + } + + @Override + public InetSocketAddress getRemoteAddress() { + return session.getRemoteAddress(); + } + + @Override + public InetSocketAddress getLocalAddress() { + return session.getLocalAddress(); + } + + @Override + public boolean isSecure() { + return session.isSecure(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/RoutingWebSocketListener.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/RoutingWebSocketListener.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/RoutingWebSocketListener.java new file mode 100644 index 0000000..81376a4 --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/RoutingWebSocketListener.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket.jetty; + +import org.apache.nifi.websocket.WebSocketMessageRouter; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketAdapter; + +import java.util.UUID; + +public class RoutingWebSocketListener extends WebSocketAdapter { + private final WebSocketMessageRouter router; + private String sessionId; + + public RoutingWebSocketListener(final WebSocketMessageRouter router) { + this.router = router; + } + + @Override + public void onWebSocketConnect(final Session session) { + super.onWebSocketConnect(session); + sessionId = UUID.randomUUID().toString(); + final JettyWebSocketSession webSocketSession = new JettyWebSocketSession(sessionId, session); + router.captureSession(webSocketSession); + } + + @Override + public void onWebSocketClose(final int statusCode, final String reason) { + super.onWebSocketClose(statusCode, reason); + router.onWebSocketClose(sessionId, statusCode, reason); + } + + @Override + public void onWebSocketText(final String message) { + router.onWebSocketText(sessionId, message); + } + + @Override + public void onWebSocketBinary(final byte[] payload, final int offset, final int len) { + router.onWebSocketBinary(sessionId, payload, offset, len); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/26a5881d/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000..6f3afe4 --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.websocket.jetty.JettyWebSocketServer +org.apache.nifi.websocket.jetty.JettyWebSocketClient \ No newline at end of file