[ 
https://issues.apache.org/jira/browse/NIFI-5516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16629931#comment-16629931
 ] 

ASF GitHub Bot commented on NIFI-5516:
--------------------------------------

Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2947#discussion_r220835190
  
    --- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
 ---
    @@ -0,0 +1,578 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.controller.queue.clustered.server;
    +
    +import org.apache.nifi.connectable.Connection;
    +import org.apache.nifi.controller.FlowController;
    +import org.apache.nifi.controller.queue.FlowFileQueue;
    +import org.apache.nifi.controller.queue.LoadBalanceCompression;
    +import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
    +import org.apache.nifi.controller.repository.ContentRepository;
    +import org.apache.nifi.controller.repository.FlowFileRecord;
    +import org.apache.nifi.controller.repository.FlowFileRepository;
    +import org.apache.nifi.controller.repository.RepositoryRecord;
    +import org.apache.nifi.controller.repository.StandardFlowFileRecord;
    +import org.apache.nifi.controller.repository.StandardRepositoryRecord;
    +import org.apache.nifi.controller.repository.claim.ContentClaim;
    +import org.apache.nifi.controller.repository.io.LimitedInputStream;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.provenance.ProvenanceEventRecord;
    +import org.apache.nifi.provenance.ProvenanceEventType;
    +import org.apache.nifi.provenance.ProvenanceRepository;
    +import org.apache.nifi.provenance.StandardProvenanceEventRecord;
    +import org.apache.nifi.remote.StandardVersionNegotiator;
    +import org.apache.nifi.remote.VersionNegotiator;
    +import org.apache.nifi.security.util.CertificateUtils;
    +import org.apache.nifi.stream.io.ByteCountingInputStream;
    +import org.apache.nifi.stream.io.LimitingInputStream;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.net.ssl.SSLPeerUnverifiedException;
    +import javax.net.ssl.SSLSession;
    +import javax.net.ssl.SSLSocket;
    +import java.io.BufferedInputStream;
    +import java.io.BufferedOutputStream;
    +import java.io.DataInputStream;
    +import java.io.EOFException;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.net.Socket;
    +import java.net.SocketTimeoutException;
    +import java.nio.ByteBuffer;
    +import java.nio.charset.StandardCharsets;
    +import java.security.cert.Certificate;
    +import java.security.cert.CertificateException;
    +import java.security.cert.X509Certificate;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.stream.Collectors;
    +import java.util.zip.CRC32;
    +import java.util.zip.CheckedInputStream;
    +import java.util.zip.Checksum;
    +import java.util.zip.GZIPInputStream;
    +
    +import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.ABORT_PROTOCOL_NEGOTIATION;
    +import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.ABORT_TRANSACTION;
    +import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.COMPLETE_TRANSACTION;
    +import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.CONFIRM_CHECKSUM;
    +import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.CONFIRM_COMPLETE_TRANSACTION;
    +import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS;
    +import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.MORE_FLOWFILES;
    +import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.NO_DATA_FRAME;
    +import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.NO_MORE_FLOWFILES;
    +import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.REJECT_CHECKSUM;
    +import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.REQEUST_DIFFERENT_VERSION;
    +import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.VERSION_ACCEPTED;
    +
    +public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
    +    private static final Logger logger = 
LoggerFactory.getLogger(StandardLoadBalanceProtocol.class);
    +
    +    private static final int SOCKET_CLOSED = -1;
    +    private static final int NO_DATA_AVAILABLE = 0;
    +
    +    private final FlowFileRepository flowFileRepository;
    +    private final ContentRepository contentRepository;
    +    private final ProvenanceRepository provenanceRepository;
    +    private final FlowController flowController;
    +    private final LoadBalanceAuthorizer authorizer;
    +
    +    private final ThreadLocal<byte[]> dataBuffer = new ThreadLocal<>();
    +    private final AtomicLong lineageStartIndex = new AtomicLong(0L);
    +
    +    public StandardLoadBalanceProtocol(final FlowFileRepository 
flowFileRepository, final ContentRepository contentRepository, final 
ProvenanceRepository provenanceRepository,
    +                                       final FlowController 
flowController, final LoadBalanceAuthorizer authorizer) {
    +        this.flowFileRepository = flowFileRepository;
    +        this.contentRepository = contentRepository;
    +        this.provenanceRepository = provenanceRepository;
    +        this.flowController = flowController;
    +        this.authorizer = authorizer;
    +    }
    +
    +
    +    @Override
    +    public void receiveFlowFiles(final Socket socket) throws IOException {
    +        final InputStream in = new 
BufferedInputStream(socket.getInputStream());
    +        final OutputStream out = new 
BufferedOutputStream(socket.getOutputStream());
    +
    +        String peerDescription = socket.getInetAddress().toString();
    +        if (socket instanceof SSLSocket) {
    +            final SSLSession sslSession = ((SSLSocket) 
socket).getSession();
    +
    +            final Set<String> certIdentities;
    +            try {
    +                certIdentities = getCertificateIdentities(sslSession);
    +
    +                final String dn = 
CertificateUtils.extractPeerDNFromSSLSocket(socket);
    +                peerDescription = CertificateUtils.extractUsername(dn);
    +            } catch (final CertificateException e) {
    +                throw new IOException("Failed to extract Client 
Certificate", e);
    +            }
    +
    +            logger.debug("Connection received from peer {}. Will perform 
authorization against Client Identities '{}'",
    +                peerDescription, certIdentities);
    +
    +            authorizer.authorize(certIdentities);
    +            logger.debug("Client Identities {} are authorized to load 
balance data", certIdentities);
    --- End diff --
    
    Just a question, is there any reason for this authorization to be here? I 
was wondering if it can be done earlier at ConnectionLoadBalanceServer 
AcceptConnection.run or CommunicateAction.run.


> Allow data in a Connection to be Load-Balanced across cluster
> -------------------------------------------------------------
>
>                 Key: NIFI-5516
>                 URL: https://issues.apache.org/jira/browse/NIFI-5516
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Core Framework
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>            Priority: Major
>
> Allow user to configure a Connection to be load balanced across the cluster. 
> For more information, see Feature Proposal at 
> https://cwiki.apache.org/confluence/display/NIFI/Load-Balanced+Connections



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to