Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1184#discussion_r87958669
  
    --- Diff: 
nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketServer.java
 ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.websocket.jetty;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnDisabled;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.annotation.lifecycle.OnShutdown;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.nar.NarCloseable;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.ssl.SSLContextService;
    +import org.apache.nifi.websocket.WebSocketConfigurationException;
    +import org.apache.nifi.websocket.WebSocketMessageRouter;
    +import org.apache.nifi.websocket.WebSocketServerService;
    +import org.eclipse.jetty.server.Connector;
    +import org.eclipse.jetty.server.Handler;
    +import org.eclipse.jetty.server.HttpConfiguration;
    +import org.eclipse.jetty.server.HttpConnectionFactory;
    +import org.eclipse.jetty.server.SecureRequestCustomizer;
    +import org.eclipse.jetty.server.Server;
    +import org.eclipse.jetty.server.ServerConnector;
    +import org.eclipse.jetty.server.SslConnectionFactory;
    +import org.eclipse.jetty.server.handler.ContextHandlerCollection;
    +import org.eclipse.jetty.servlet.ServletContextHandler;
    +import org.eclipse.jetty.servlet.ServletHandler;
    +import org.eclipse.jetty.util.ssl.SslContextFactory;
    +import org.eclipse.jetty.websocket.api.Session;
    +import org.eclipse.jetty.websocket.api.WebSocketPolicy;
    +import org.eclipse.jetty.websocket.server.WebSocketServerFactory;
    +import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
    +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
    +import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
    +import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
    +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
    +
    +import java.net.URI;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +@Tags({"WebSocket", "Jetty", "server"})
    +@CapabilityDescription("Implementation of WebSocketServerService." +
    +        " This service uses Jetty WebSocket server module to provide" +
    +        " WebSocket session management throughout the application.")
    +public class JettyWebSocketServer extends AbstractJettyWebSocketService 
implements WebSocketServerService {
    +
    +    /**
    +     * A global map to refer a controller service instance by requested 
port number.
    +     */
    +    private static final Map<Integer, JettyWebSocketServer> 
portToControllerService = new ConcurrentHashMap<>();
    +
    +    // Allowable values for client auth
    +    public static final AllowableValue CLIENT_NONE = new 
AllowableValue("no", "No Authentication",
    +            "Processor will not authenticate clients. Anyone can 
communicate with this Processor anonymously");
    +    public static final AllowableValue CLIENT_WANT = new 
AllowableValue("want", "Want Authentication",
    +            "Processor will try to verify the client but if unable to 
verify will allow the client to communicate anonymously");
    +    public static final AllowableValue CLIENT_NEED = new 
AllowableValue("need", "Need Authentication",
    +            "Processor will reject communications from any client unless 
the client provides a certificate that is trusted by the TrustStore"
    +                    + "specified in the SSL Context Service");
    +
    +    public static final PropertyDescriptor CLIENT_AUTH = new 
PropertyDescriptor.Builder()
    +            .name("client-authentication")
    +            .displayName("Client Authentication")
    +            .description("Specifies whether or not the Processor should 
authenticate clients. This value is ignored if the <SSL Context Service> "
    +                    + "Property is not specified or the SSL Context 
provided uses only a KeyStore and not a TrustStore.")
    +            .required(true)
    +            .allowableValues(CLIENT_NONE, CLIENT_WANT, CLIENT_NEED)
    +            .defaultValue(CLIENT_NONE.getValue())
    +            .build();
    +
    +    public static final PropertyDescriptor LISTEN_PORT = new 
PropertyDescriptor.Builder()
    +            .name("listen-port")
    +            .displayName("Listen Port")
    +            .description("The port number on which this WebSocketServer 
listens to.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .build();
    +
    +    private static final List<PropertyDescriptor> properties;
    +
    +    static {
    +        final List<PropertyDescriptor> props = new ArrayList<>();
    +        props.addAll(getAbstractPropertyDescriptors());
    +        props.add(LISTEN_PORT);
    +        props.add(SSL_CONTEXT);
    +        props.add(CLIENT_AUTH);
    +
    +        properties = Collections.unmodifiableList(props);
    +    }
    +
    +    private WebSocketPolicy configuredPolicy;
    +    private Server server;
    +    private Integer listenPort;
    +    private ServletHandler servletHandler;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +
    +    public static class JettyWebSocketServlet extends WebSocketServlet 
implements WebSocketCreator {
    +        @Override
    +        public void configure(WebSocketServletFactory 
webSocketServletFactory) {
    +            webSocketServletFactory.setCreator(this);
    +        }
    +
    +        @Override
    +        public Object createWebSocket(ServletUpgradeRequest 
servletUpgradeRequest, ServletUpgradeResponse servletUpgradeResponse) {
    +            final URI requestURI = servletUpgradeRequest.getRequestURI();
    +            final int port = requestURI.getPort();
    +            final JettyWebSocketServer service = 
portToControllerService.get(port);
    +
    +            if (service == null) {
    +                throw new RuntimeException("No controller service is bound 
with port: " + port);
    +            }
    +
    +            final String path = requestURI.getPath();
    +            final WebSocketMessageRouter router;
    +            try {
    +                router = service.routers.getRouterOrFail(path);
    +            } catch (WebSocketConfigurationException e) {
    +                throw new IllegalStateException("Failed to get router due 
to: "  + e, e);
    +            }
    +
    +            final RoutingWebSocketListener listener = new 
RoutingWebSocketListener(router) {
    +                @Override
    +                public void onWebSocketConnect(Session session) {
    +                    final WebSocketPolicy currentPolicy = 
session.getPolicy();
    +                    
currentPolicy.setInputBufferSize(service.configuredPolicy.getInputBufferSize());
    +                    
currentPolicy.setMaxTextMessageSize(service.configuredPolicy.getMaxTextMessageSize());
    +                    
currentPolicy.setMaxBinaryMessageSize(service.configuredPolicy.getMaxBinaryMessageSize());
    +                    super.onWebSocketConnect(session);
    +                }
    +            };
    +
    +            return listener;
    +        }
    +    }
    +
    +    @OnEnabled
    +    @Override
    +    public void startServer(final ConfigurationContext context) throws 
Exception {
    +
    +        if (server != null && server.isRunning()) {
    +            getLogger().info("A WebSocket server is already running. {}", 
new Object[]{server});
    +            return;
    +        }
    +
    +        configuredPolicy = WebSocketPolicy.newServerPolicy();
    +        configurePolicy(context, configuredPolicy);
    +
    +        server = new Server();
    +
    +        final ContextHandlerCollection handlerCollection = new 
ContextHandlerCollection();
    +
    +        final ServletContextHandler contextHandler = new 
ServletContextHandler();
    +        servletHandler = new ServletHandler();
    +        contextHandler.insertHandler(servletHandler);
    +
    +        handlerCollection.setHandlers(new Handler[]{contextHandler});
    +
    +        server.setHandler(handlerCollection);
    +
    +        listenPort = context.getProperty(LISTEN_PORT).asInteger();
    +        final SslContextFactory sslContextFactory = 
createSslFactory(context);
    +
    +        final ServerConnector serverConnector = 
createConnector(sslContextFactory, listenPort);
    +
    +        server.setConnectors(new Connector[] {serverConnector});
    +
    +        servletHandler.addServletWithMapping(JettyWebSocketServlet.class, 
"/*");
    +
    +        // Need to specify classloader, otherwise since the callstack 
doesn't have any nifi specific class, so it can't use nar.
    +        try (NarCloseable closeable = 
NarCloseable.withComponentNarLoader(WebSocketServerFactory.class)) {
    --- End diff --
    
    Thanks Oleg, I removed the use of NarCloseable and confirmed that 
JettyWebSocketServer works without this now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to