[ https://issues.apache.org/jira/browse/NIFI-5516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16629931#comment-16629931 ]
ASF GitHub Bot commented on NIFI-5516: -------------------------------------- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2947#discussion_r220835190 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java --- @@ -0,0 +1,578 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue.clustered.server; + +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.LoadBalanceCompression; +import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue; +import org.apache.nifi.controller.repository.ContentRepository; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.FlowFileRepository; +import org.apache.nifi.controller.repository.RepositoryRecord; +import org.apache.nifi.controller.repository.StandardFlowFileRecord; +import org.apache.nifi.controller.repository.StandardRepositoryRecord; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.io.LimitedInputStream; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.ProvenanceRepository; +import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.apache.nifi.remote.StandardVersionNegotiator; +import org.apache.nifi.remote.VersionNegotiator; +import org.apache.nifi.security.util.CertificateUtils; +import org.apache.nifi.stream.io.ByteCountingInputStream; +import org.apache.nifi.stream.io.LimitingInputStream; +import org.apache.nifi.stream.io.StreamUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLSession; +import javax.net.ssl.SSLSocket; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.security.cert.Certificate; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.zip.CRC32; +import java.util.zip.CheckedInputStream; +import java.util.zip.Checksum; +import java.util.zip.GZIPInputStream; + +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.ABORT_PROTOCOL_NEGOTIATION; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.ABORT_TRANSACTION; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.COMPLETE_TRANSACTION; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.CONFIRM_CHECKSUM; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.CONFIRM_COMPLETE_TRANSACTION; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.MORE_FLOWFILES; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.NO_DATA_FRAME; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.NO_MORE_FLOWFILES; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.REJECT_CHECKSUM; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.REQEUST_DIFFERENT_VERSION; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.VERSION_ACCEPTED; + +public class StandardLoadBalanceProtocol implements LoadBalanceProtocol { + private static final Logger logger = LoggerFactory.getLogger(StandardLoadBalanceProtocol.class); + + private static final int SOCKET_CLOSED = -1; + private static final int NO_DATA_AVAILABLE = 0; + + private final FlowFileRepository flowFileRepository; + private final ContentRepository contentRepository; + private final ProvenanceRepository provenanceRepository; + private final FlowController flowController; + private final LoadBalanceAuthorizer authorizer; + + private final ThreadLocal<byte[]> dataBuffer = new ThreadLocal<>(); + private final AtomicLong lineageStartIndex = new AtomicLong(0L); + + public StandardLoadBalanceProtocol(final FlowFileRepository flowFileRepository, final ContentRepository contentRepository, final ProvenanceRepository provenanceRepository, + final FlowController flowController, final LoadBalanceAuthorizer authorizer) { + this.flowFileRepository = flowFileRepository; + this.contentRepository = contentRepository; + this.provenanceRepository = provenanceRepository; + this.flowController = flowController; + this.authorizer = authorizer; + } + + + @Override + public void receiveFlowFiles(final Socket socket) throws IOException { + final InputStream in = new BufferedInputStream(socket.getInputStream()); + final OutputStream out = new BufferedOutputStream(socket.getOutputStream()); + + String peerDescription = socket.getInetAddress().toString(); + if (socket instanceof SSLSocket) { + final SSLSession sslSession = ((SSLSocket) socket).getSession(); + + final Set<String> certIdentities; + try { + certIdentities = getCertificateIdentities(sslSession); + + final String dn = CertificateUtils.extractPeerDNFromSSLSocket(socket); + peerDescription = CertificateUtils.extractUsername(dn); + } catch (final CertificateException e) { + throw new IOException("Failed to extract Client Certificate", e); + } + + logger.debug("Connection received from peer {}. Will perform authorization against Client Identities '{}'", + peerDescription, certIdentities); + + authorizer.authorize(certIdentities); + logger.debug("Client Identities {} are authorized to load balance data", certIdentities); --- End diff -- Just a question, is there any reason for this authorization to be here? I was wondering if it can be done earlier at ConnectionLoadBalanceServer AcceptConnection.run or CommunicateAction.run. > Allow data in a Connection to be Load-Balanced across cluster > ------------------------------------------------------------- > > Key: NIFI-5516 > URL: https://issues.apache.org/jira/browse/NIFI-5516 > Project: Apache NiFi > Issue Type: New Feature > Components: Core Framework > Reporter: Mark Payne > Assignee: Mark Payne > Priority: Major > > Allow user to configure a Connection to be load balanced across the cluster. > For more information, see Feature Proposal at > https://cwiki.apache.org/confluence/display/NIFI/Load-Balanced+Connections -- This message was sent by Atlassian JIRA (v7.6.3#76005)