Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2947#discussion_r220978179
  
    --- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java
 ---
    @@ -0,0 +1,251 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.controller.queue.clustered.server;
    +
    +import org.apache.nifi.engine.FlowEngine;
    +import org.apache.nifi.events.EventReporter;
    +import org.apache.nifi.reporting.Severity;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.net.ssl.SSLContext;
    +import javax.net.ssl.SSLServerSocket;
    +import java.io.IOException;
    +import java.net.InetAddress;
    +import java.net.ServerSocket;
    +import java.net.Socket;
    +import java.net.SocketTimeoutException;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.Set;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +
    +public class ConnectionLoadBalanceServer {
    +    private static final Logger logger = 
LoggerFactory.getLogger(ConnectionLoadBalanceServer.class);
    +
    +    private final String hostname;
    +    private final int port;
    +    private final SSLContext sslContext;
    +    private final ExecutorService threadPool;
    +    private final LoadBalanceProtocol loadBalanceProtocol;
    +    private final int connectionTimeoutMillis;
    +    private final int numThreads;
    +    private final EventReporter eventReporter;
    +
    +    private volatile Set<CommunicateAction> communicationActions = 
Collections.emptySet();
    +    private final BlockingQueue<Socket> connectionQueue = new 
LinkedBlockingQueue<>();
    +
    +    private volatile AcceptConnection acceptConnection;
    +    private volatile ServerSocket serverSocket;
    +    private volatile boolean stopped = true;
    +
    +    public ConnectionLoadBalanceServer(final String hostname, final int 
port, final SSLContext sslContext, final int numThreads, final 
LoadBalanceProtocol loadBalanceProtocol,
    +                                       final EventReporter eventReporter, 
final int connectionTimeoutMillis) {
    +        this.hostname = hostname;
    +        this.port = port;
    +        this.sslContext = sslContext;
    +        this.loadBalanceProtocol = loadBalanceProtocol;
    +        this.connectionTimeoutMillis = connectionTimeoutMillis;
    +        this.numThreads = numThreads;
    +        this.eventReporter = eventReporter;
    +
    +        threadPool = new FlowEngine(numThreads, "Load Balance Server");
    +    }
    +
    +    public void start() throws IOException {
    +        if (!stopped) {
    +            return;
    +        }
    +
    +        stopped = false;
    +        if (serverSocket != null) {
    +            return;
    +        }
    +
    +        try {
    +            serverSocket = createServerSocket();
    +        } catch (final Exception e) {
    +            throw new IOException("Could not begin listening for incoming 
connections in order to load balance data across the cluster. Please verify the 
values of the " +
    +                    "'nifi.cluster.load.balance.port' and 
'nifi.cluster.load.balance.host' properties as well as the 'nifi.security.*' 
properties", e);
    +        }
    +
    +        final Set<CommunicateAction> actions = new HashSet<>(numThreads);
    +        for (int i=0; i < numThreads; i++) {
    +            final CommunicateAction action = new 
CommunicateAction(loadBalanceProtocol);
    +            actions.add(action);
    +            threadPool.submit(action);
    +        }
    +
    +        this.communicationActions = actions;
    +
    +        acceptConnection = new AcceptConnection(serverSocket);
    +        final Thread receiveConnectionThread = new 
Thread(acceptConnection);
    +        receiveConnectionThread.setName("Receive Queue Load-Balancing 
Connections");
    +        receiveConnectionThread.start();
    +    }
    +
    +    public int getPort() {
    +        return serverSocket.getLocalPort();
    +    }
    +
    +    public void stop() {
    +        stopped = false;
    +        threadPool.shutdown();
    +
    +        if (acceptConnection != null) {
    +            acceptConnection.stop();
    +        }
    +
    +        communicationActions.forEach(CommunicateAction::stop);
    +
    +        Socket socket;
    +        while ((socket = connectionQueue.poll()) != null) {
    +            try {
    +                socket.close();
    +                logger.info("{} Closed connection to {} on Server stop", 
this, socket.getRemoteSocketAddress());
    +            } catch (final IOException ioe) {
    +                logger.warn("Failed to properly close socket to " + 
socket.getRemoteSocketAddress(), ioe);
    +            }
    +        }
    +    }
    +
    +    private ServerSocket createServerSocket() throws IOException {
    +        final InetAddress inetAddress = hostname == null ? null : 
InetAddress.getByName(hostname);
    +
    +        if (sslContext == null) {
    +            return new ServerSocket(port, 50, 
InetAddress.getByName(hostname));
    +        } else {
    +            final ServerSocket serverSocket = 
sslContext.getServerSocketFactory().createServerSocket(port, 50, inetAddress);
    +            ((SSLServerSocket) serverSocket).setNeedClientAuth(true);
    +            return serverSocket;
    +        }
    +    }
    +
    +
    +    private class CommunicateAction implements Runnable {
    +        private final LoadBalanceProtocol loadBalanceProtocol;
    +        private volatile boolean stopped = false;
    +
    +        public CommunicateAction(final LoadBalanceProtocol 
loadBalanceProtocol) {
    +            this.loadBalanceProtocol = loadBalanceProtocol;
    +        }
    +
    +        public void stop() {
    +            this.stopped = true;
    +        }
    +
    +        @Override
    +        public void run() {
    +            String peerDescription = "<Unknown Client>";
    +
    +            while (!stopped) {
    +                Socket socket = null;
    +                try {
    +                    socket = connectionQueue.poll(1, TimeUnit.SECONDS);
    +                    if (socket == null) {
    +                        continue;
    +                    }
    +
    +                    peerDescription = 
socket.getRemoteSocketAddress().toString();
    +
    +                    if (socket.isClosed()) {
    +                        logger.debug("Connection to Peer {} is closed. 
Will not attempt to communicate over this Socket.", peerDescription);
    +                        continue;
    +                    }
    +
    +                    logger.debug("Receiving FlowFiles from Peer {}", 
peerDescription);
    +                    loadBalanceProtocol.receiveFlowFiles(socket);
    --- End diff --
    
    That's a good point, and honestly not something that I had thought about. 
Whether or not we would need to test for this would depend on the select 
partitioner. For example, if trying to offload all data from the node for 
decommissioning purposes, we want to ignore that, I think, and push data to the 
node anyway. In the case of Partition By Attribute, though, you have a great 
point.
    Personally, I would be inclined to create another JIRA to address this in 
the future, and document this as a limitation for the time being. This will 
take a decent bit of thought to do this efficiently and effectively.


---

Reply via email to