[ 
https://issues.apache.org/jira/browse/NIFI-5516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16628364#comment-16628364
 ] 

ASF GitHub Bot commented on NIFI-5516:
--------------------------------------

Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2947#discussion_r220428865
  
    --- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/RegisteredPartition.java
 ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.controller.queue.clustered.client.async.nio;
    +
    +import org.apache.nifi.controller.queue.LoadBalanceCompression;
    +import 
org.apache.nifi.controller.queue.clustered.client.async.TransactionCompleteCallback;
    +import 
org.apache.nifi.controller.queue.clustered.client.async.TransactionFailureCallback;
    +import org.apache.nifi.controller.repository.FlowFileRecord;
    +
    +import java.util.function.BooleanSupplier;
    +import java.util.function.Supplier;
    +
    +public class RegisteredPartition {
    +    private final String connectionId;
    +    private final Supplier<FlowFileRecord> flowFileRecordSupplier;
    +    private final TransactionFailureCallback failureCallback;
    +    private final BooleanSupplier emptySupplier;
    +    private final TransactionCompleteCallback successCallback;
    +    private final LoadBalanceCompression compression;
    +
    +    public RegisteredPartition(final String connectionId, final 
BooleanSupplier emptySupplier, final Supplier<FlowFileRecord> flowFileSupplier, 
final TransactionFailureCallback failureCallback,
    +                               final TransactionCompleteCallback 
successCallback, final LoadBalanceCompression compression) {
    +        this.connectionId = connectionId;
    +        this.emptySupplier = emptySupplier;
    +        this.flowFileRecordSupplier = flowFileSupplier;
    +        this.failureCallback = failureCallback;
    +        this.successCallback = successCallback;
    +        this.compression = compression;
    --- End diff --
    
    I haven't read the code enough to understand how these partitions are 
managed. But it seems once a partition is registered, we won't be able to 
change its `compression`.  The steps for a connection to use compression by 
NiFi UI is as follows:
    1. Send a POST request to create a connection
    2. Send a PUT request to updated load-balance configuration for a connection
    
    But it seems that current PR only support setting compression only when a 
connection is added at the sending side. Receiving side uses the latest 
compression setting and resulted following Exception:
    
    ```
    2018-09-26 05:08:18,439 ERROR [Load Balance Server Thread-4] 
o.a.n.c.q.c.s.ConnectionLoadBalanceServer Failed to communicate with Peer 
nifi0.rumawaks.com/10.1.0.5:35502
    java.util.zip.ZipException: Not in GZIP format
            at 
java.util.zip.GZIPInputStream.readHeader(GZIPInputStream.java:165)
            at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:79)
            at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:91)
            at 
org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol.receiveFlowFile(StandardLoadBalanceProtocol.java:428)
            at 
org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol.receiveFlowFiles(StandardLoadBalanceProtocol.java:267)
            at 
org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol.receiveFlowFiles(StandardLoadBalanceProtocol.java:151)
            at 
org.apache.nifi.controller.queue.clustered.server.ConnectionLoadBalanceServer$CommunicateAction.run(ConnectionLoadBalanceServer.java:176)
            at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
            at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    ```



> Allow data in a Connection to be Load-Balanced across cluster
> -------------------------------------------------------------
>
>                 Key: NIFI-5516
>                 URL: https://issues.apache.org/jira/browse/NIFI-5516
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Core Framework
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>            Priority: Major
>
> Allow user to configure a Connection to be load balanced across the cluster. 
> For more information, see Feature Proposal at 
> https://cwiki.apache.org/confluence/display/NIFI/Load-Balanced+Connections



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to