[ https://issues.apache.org/jira/browse/NIFI-1002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15666337#comment-15666337 ]
ASF GitHub Bot commented on NIFI-1002: -------------------------------------- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/1184#discussion_r87958669 --- Diff: nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketServer.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket.jetty; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.nar.NarCloseable; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.websocket.WebSocketConfigurationException; +import org.apache.nifi.websocket.WebSocketMessageRouter; +import org.apache.nifi.websocket.WebSocketServerService; +import org.eclipse.jetty.server.Connector; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.HttpConnectionFactory; +import org.eclipse.jetty.server.SecureRequestCustomizer; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.SslConnectionFactory; +import org.eclipse.jetty.server.handler.ContextHandlerCollection; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHandler; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketPolicy; +import org.eclipse.jetty.websocket.server.WebSocketServerFactory; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.eclipse.jetty.websocket.servlet.WebSocketCreator; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Tags({"WebSocket", "Jetty", "server"}) +@CapabilityDescription("Implementation of WebSocketServerService." + + " This service uses Jetty WebSocket server module to provide" + + " WebSocket session management throughout the application.") +public class JettyWebSocketServer extends AbstractJettyWebSocketService implements WebSocketServerService { + + /** + * A global map to refer a controller service instance by requested port number. + */ + private static final Map<Integer, JettyWebSocketServer> portToControllerService = new ConcurrentHashMap<>(); + + // Allowable values for client auth + public static final AllowableValue CLIENT_NONE = new AllowableValue("no", "No Authentication", + "Processor will not authenticate clients. Anyone can communicate with this Processor anonymously"); + public static final AllowableValue CLIENT_WANT = new AllowableValue("want", "Want Authentication", + "Processor will try to verify the client but if unable to verify will allow the client to communicate anonymously"); + public static final AllowableValue CLIENT_NEED = new AllowableValue("need", "Need Authentication", + "Processor will reject communications from any client unless the client provides a certificate that is trusted by the TrustStore" + + "specified in the SSL Context Service"); + + public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() + .name("client-authentication") + .displayName("Client Authentication") + .description("Specifies whether or not the Processor should authenticate clients. This value is ignored if the <SSL Context Service> " + + "Property is not specified or the SSL Context provided uses only a KeyStore and not a TrustStore.") + .required(true) + .allowableValues(CLIENT_NONE, CLIENT_WANT, CLIENT_NEED) + .defaultValue(CLIENT_NONE.getValue()) + .build(); + + public static final PropertyDescriptor LISTEN_PORT = new PropertyDescriptor.Builder() + .name("listen-port") + .displayName("Listen Port") + .description("The port number on which this WebSocketServer listens to.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.PORT_VALIDATOR) + .build(); + + private static final List<PropertyDescriptor> properties; + + static { + final List<PropertyDescriptor> props = new ArrayList<>(); + props.addAll(getAbstractPropertyDescriptors()); + props.add(LISTEN_PORT); + props.add(SSL_CONTEXT); + props.add(CLIENT_AUTH); + + properties = Collections.unmodifiableList(props); + } + + private WebSocketPolicy configuredPolicy; + private Server server; + private Integer listenPort; + private ServletHandler servletHandler; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + + public static class JettyWebSocketServlet extends WebSocketServlet implements WebSocketCreator { + @Override + public void configure(WebSocketServletFactory webSocketServletFactory) { + webSocketServletFactory.setCreator(this); + } + + @Override + public Object createWebSocket(ServletUpgradeRequest servletUpgradeRequest, ServletUpgradeResponse servletUpgradeResponse) { + final URI requestURI = servletUpgradeRequest.getRequestURI(); + final int port = requestURI.getPort(); + final JettyWebSocketServer service = portToControllerService.get(port); + + if (service == null) { + throw new RuntimeException("No controller service is bound with port: " + port); + } + + final String path = requestURI.getPath(); + final WebSocketMessageRouter router; + try { + router = service.routers.getRouterOrFail(path); + } catch (WebSocketConfigurationException e) { + throw new IllegalStateException("Failed to get router due to: " + e, e); + } + + final RoutingWebSocketListener listener = new RoutingWebSocketListener(router) { + @Override + public void onWebSocketConnect(Session session) { + final WebSocketPolicy currentPolicy = session.getPolicy(); + currentPolicy.setInputBufferSize(service.configuredPolicy.getInputBufferSize()); + currentPolicy.setMaxTextMessageSize(service.configuredPolicy.getMaxTextMessageSize()); + currentPolicy.setMaxBinaryMessageSize(service.configuredPolicy.getMaxBinaryMessageSize()); + super.onWebSocketConnect(session); + } + }; + + return listener; + } + } + + @OnEnabled + @Override + public void startServer(final ConfigurationContext context) throws Exception { + + if (server != null && server.isRunning()) { + getLogger().info("A WebSocket server is already running. {}", new Object[]{server}); + return; + } + + configuredPolicy = WebSocketPolicy.newServerPolicy(); + configurePolicy(context, configuredPolicy); + + server = new Server(); + + final ContextHandlerCollection handlerCollection = new ContextHandlerCollection(); + + final ServletContextHandler contextHandler = new ServletContextHandler(); + servletHandler = new ServletHandler(); + contextHandler.insertHandler(servletHandler); + + handlerCollection.setHandlers(new Handler[]{contextHandler}); + + server.setHandler(handlerCollection); + + listenPort = context.getProperty(LISTEN_PORT).asInteger(); + final SslContextFactory sslContextFactory = createSslFactory(context); + + final ServerConnector serverConnector = createConnector(sslContextFactory, listenPort); + + server.setConnectors(new Connector[] {serverConnector}); + + servletHandler.addServletWithMapping(JettyWebSocketServlet.class, "/*"); + + // Need to specify classloader, otherwise since the callstack doesn't have any nifi specific class, so it can't use nar. + try (NarCloseable closeable = NarCloseable.withComponentNarLoader(WebSocketServerFactory.class)) { --- End diff -- Thanks Oleg, I removed the use of NarCloseable and confirmed that JettyWebSocketServer works without this now. > support for Listen WebSocket processor > --------------------------------------- > > Key: NIFI-1002 > URL: https://issues.apache.org/jira/browse/NIFI-1002 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions > Affects Versions: 0.4.0 > Reporter: sumanth chinthagunta > Priority: Minor > Labels: newbie > > A WebSocket listen processor will be helpful for IoT data ingestion. > I am playing with embedded Vert.X for WebSocket and also ability to put > FlowFiles back to WebSocket client via Vert.X EventBus. > https://github.com/xmlking/nifi-websocket > I am new to NiFi. any advise can be helpful. > PS: I feel forcing Interfaces for Controller Services is unnecessary as in > many cases Controller Services are only used by a set of Processors and > developers usually bundle them together. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)