This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new afad982  NIFI-7200: Revert "NIFI-6530 - HTTP SiteToSite server returns 
201 in case no data is available"
afad982 is described below

commit afad982e91debd1109a6ec6d1865a77e8b3470ee
Author: Mark Payne <[email protected]>
AuthorDate: Tue Mar 10 11:56:49 2020 -0400

    NIFI-7200: Revert "NIFI-6530 - HTTP SiteToSite server returns 201 in case 
no data is available"
    
    This reverts commit f01668e66ad2e45197915769e966a4be27e1592e.
    
    Signed-off-by: Joe Witt <[email protected]>
---
 .../apache/nifi/remote/client/PeerSelector.java    | 22 +++++---
 .../apache/nifi/remote/client/http/HttpClient.java | 22 ++------
 .../http/TransportProtocolVersionNegotiator.java   |  1 -
 .../client/socket/EndpointConnectionPool.java      | 19 +++----
 .../nifi/remote/client/socket/SocketClient.java    |  6 +--
 .../nifi/remote/exception/NoContentException.java  | 39 --------------
 .../remote/exception/NoValidPeerException.java     | 40 --------------
 .../protocol/socket/SocketClientTransaction.java   |  4 --
 .../nifi/remote/util/SiteToSiteRestApiClient.java  |  6 +--
 .../nifi/remote/client/TestPeerSelector.java       | 31 +++++------
 .../nifi/remote/client/http/TestHttpClient.java    | 63 +++-------------------
 .../socket/TestSocketClientTransaction.java        | 17 +++---
 .../java/org/apache/nifi/spark/NiFiReceiver.java   |  7 ---
 .../nifi/remote/StandardRemoteGroupPort.java       | 13 ++---
 .../stateless/core/StatelessRemoteOutputPort.java  |  8 +--
 .../apache/nifi/web/api/DataTransferResource.java  | 19 +------
 .../apache/nifi/toolkit/s2s/SiteToSiteCliMain.java |  3 --
 .../nifi/toolkit/s2s/SiteToSiteReceiver.java       |  4 --
 18 files changed, 72 insertions(+), 252 deletions(-)

diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
index 8235a38..0a61077 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
@@ -224,7 +224,7 @@ public class PeerSelector {
      *                  for RECEIVE, a peer with more flow files is preferred
      * @return a selected peer, if there is no available peer or all peers are 
penalized, then return null
      */
-    public ArrayList<PeerStatus> getPeerStatuses(final TransferDirection 
direction) {
+    public PeerStatus getNextPeerStatus(final TransferDirection direction) {
         List<PeerStatus> peerList = peerStatuses;
         if (isPeerRefreshNeeded(peerList)) {
             peerRefreshLock.lock();
@@ -251,15 +251,25 @@ public class PeerSelector {
             }
         }
 
-
         if (peerList == null || peerList.isEmpty()) {
-            return new ArrayList<PeerStatus>();
+            return null;
         }
 
-        ArrayList<PeerStatus> retVal = new ArrayList<>(peerList);
-        retVal.removeIf(p -> isPenalized(p));
+        PeerStatus peerStatus;
+        for (int i = 0; i < peerList.size(); i++) {
+            final long idx = peerIndex.getAndIncrement();
+            final int listIndex = (int) (idx % peerList.size());
+            peerStatus = peerList.get(listIndex);
+
+            if (isPenalized(peerStatus)) {
+                logger.debug("{} {} is penalized; will not communicate with 
this peer", this, peerStatus);
+            } else {
+                return peerStatus;
+            }
+        }
 
-        return retVal;
+        logger.debug("{} All peers appear to be penalized; returning null", 
this);
+        return null;
     }
 
     private List<PeerStatus> createPeerStatusList(final TransferDirection 
direction) throws IOException {
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
index 690cdfd..660f5ea 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
@@ -27,8 +27,6 @@ import org.apache.nifi.remote.client.PeerSelector;
 import org.apache.nifi.remote.client.PeerStatusProvider;
 import org.apache.nifi.remote.client.SiteToSiteClientConfig;
 import org.apache.nifi.remote.exception.HandshakeException;
-import org.apache.nifi.remote.exception.NoContentException;
-import org.apache.nifi.remote.exception.NoValidPeerException;
 import org.apache.nifi.remote.exception.PortNotRunningException;
 import org.apache.nifi.remote.exception.ProtocolException;
 import org.apache.nifi.remote.exception.UnknownPortException;
@@ -43,7 +41,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -128,11 +125,9 @@ public class HttpClient extends AbstractSiteToSiteClient 
implements PeerStatusPr
     @Override
     public Transaction createTransaction(final TransferDirection direction) 
throws HandshakeException, PortNotRunningException, ProtocolException, 
UnknownPortException, IOException {
         final int timeoutMillis = (int) 
config.getTimeout(TimeUnit.MILLISECONDS);
-        Integer peersWithNoContent = 0;
 
-        ArrayList<PeerStatus> peers = peerSelector.getPeerStatuses(direction);
-
-        for  (PeerStatus peerStatus : peers) {
+        PeerStatus peerStatus;
+        while ((peerStatus = peerSelector.getNextPeerStatus(direction)) != 
null) {
             logger.debug("peerStatus={}", peerStatus);
 
             final CommunicationsSession commSession = new 
HttpCommunicationsSession();
@@ -174,11 +169,6 @@ public class HttpClient extends AbstractSiteToSiteClient 
implements PeerStatusPr
             try {
                 transactionUrl = apiClient.initiateTransaction(direction, 
portId);
                 commSession.setUserDn(apiClient.getTrustedPeerDn());
-            } catch (final NoContentException e) {
-                apiClient.close();
-                peersWithNoContent++;
-                logger.debug("Peer {} has no flowfiles to provide", peer);
-                continue;
             } catch (final Exception e) {
                 apiClient.close();
                 logger.warn("Penalizing a peer {} due to {}", peer, 
e.toString());
@@ -221,12 +211,8 @@ public class HttpClient extends AbstractSiteToSiteClient 
implements PeerStatusPr
             return transaction;
         }
 
-        if(peersWithNoContent > 0) {
-            return null;
-        }
-        String error = new String("Couldn't find a valid peer to communicate 
with.");
-        logger.info(error);
-        throw new NoValidPeerException(error);
+        logger.info("Couldn't find a valid peer to communicate with.");
+        return null;
     }
 
     private String resolveNodeApiUrl(final PeerDescription description) {
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/TransportProtocolVersionNegotiator.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/TransportProtocolVersionNegotiator.java
index 844a92e..d0a6368 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/TransportProtocolVersionNegotiator.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/TransportProtocolVersionNegotiator.java
@@ -32,7 +32,6 @@ public class TransportProtocolVersionNegotiator extends 
StandardVersionNegotiato
     public int getTransactionProtocolVersion() {
         switch (getVersion()) {
             case 1:
-            case 2:
                 return 5;
             default:
                 throw new RuntimeException("Transport protocol version " + 
getVersion()
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index 53bd963..0cf1b53 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -30,7 +30,6 @@ import org.apache.nifi.remote.client.SiteInfoProvider;
 import org.apache.nifi.remote.client.SiteToSiteClientConfig;
 import org.apache.nifi.remote.codec.FlowFileCodec;
 import org.apache.nifi.remote.exception.HandshakeException;
-import org.apache.nifi.remote.exception.NoValidPeerException;
 import org.apache.nifi.remote.exception.PortNotRunningException;
 import org.apache.nifi.remote.exception.TransmissionDisabledException;
 import org.apache.nifi.remote.exception.UnknownPortException;
@@ -166,9 +165,14 @@ public class EndpointConnectionPool implements 
PeerStatusProvider {
             throw new UnreachableClusterException("Unable to refresh details 
from any of the configured remote instances.", ioe);
         }
 
-        for  (PeerStatus peerStatus : peerSelector.getPeerStatuses(direction)) 
{
+        do {
             final List<EndpointConnection> addBack = new ArrayList<>();
+            logger.debug("{} getting next peer status", this);
+            final PeerStatus peerStatus = 
peerSelector.getNextPeerStatus(direction);
             logger.debug("{} next peer status = {}", this, peerStatus);
+            if (peerStatus == null) {
+                return null;
+            }
 
             final PeerDescription peerDescription = 
peerStatus.getPeerDescription();
             BlockingQueue<EndpointConnection> connectionQueue = 
connectionQueueMap.get(peerDescription);
@@ -188,7 +192,7 @@ public class EndpointConnectionPool implements 
PeerStatusProvider {
                 if (connection == null && !addBack.isEmpty()) {
                     // all available connections have been penalized.
                     logger.debug("{} all Connections for {} are penalized; 
returning no Connection", this, portId);
-                    throw new NoValidPeerException("All peers are penalized");
+                    return null;
                 }
 
                 if (connection != null && 
connection.getPeer().isPenalized(portId)) {
@@ -314,13 +318,10 @@ public class EndpointConnectionPool implements 
PeerStatusProvider {
                 }
             }
 
-            if( connection != null && codec != null && commsSession != null && 
protocol != null) {
-                activeConnections.add(connection);
-                return connection;
-            }
-        }
-        throw new NoValidPeerException("Didn't find any valid peer to connect 
to");
+        } while (connection == null || codec == null || commsSession == null 
|| protocol == null);
 
+        activeConnections.add(connection);
+        return connection;
     }
 
     public boolean offer(final EndpointConnection endpointConnection) {
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index ff8e0d6..64a174a 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -23,7 +23,6 @@ import org.apache.nifi.remote.TransactionCompletion;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.client.AbstractSiteToSiteClient;
 import org.apache.nifi.remote.client.SiteToSiteClientConfig;
-import org.apache.nifi.remote.exception.NoContentException;
 import org.apache.nifi.remote.protocol.DataPacket;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -126,13 +125,14 @@ public class SocketClient extends 
AbstractSiteToSiteClient {
         }
 
         final EndpointConnection connectionState = 
pool.getEndpointConnection(direction, getConfig());
+        if (connectionState == null) {
+            return null;
+        }
 
         final Transaction transaction;
         try {
             transaction = 
connectionState.getSocketClientProtocol().startTransaction(
                     connectionState.getPeer(), connectionState.getCodec(), 
direction);
-        } catch (final NoContentException e) {
-            return null;
         } catch (final Throwable t) {
             pool.terminate(connectionState);
             throw new IOException("Unable to create Transaction to communicate 
with " + connectionState.getPeer(), t);
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoContentException.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoContentException.java
deleted file mode 100644
index a0dd23d..0000000
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoContentException.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-import java.io.IOException;
-
-/**
- * A NoContentException occurs when the remote peer has no flowfiles to provide
- */
-public class NoContentException extends IOException {
-
-    private static final long serialVersionUID = -689032011082690815L;
-
-    public NoContentException(final String message, final Throwable cause) {
-        super(message, cause);
-    }
-
-    public NoContentException(final String message) {
-        super(message);
-    }
-
-    public NoContentException(final Throwable cause) {
-        super(cause);
-    }
-}
\ No newline at end of file
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoValidPeerException.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoValidPeerException.java
deleted file mode 100644
index 30a51a0..0000000
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NoValidPeerException.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-import java.io.IOException;
-
-
-/**
- * A NoValidPeerException occurs when all the remote peers are penalized or 
none exists
- */
-public class NoValidPeerException extends IOException {
-
-    private static final long serialVersionUID = 8421102798129193880L;
-
-    public NoValidPeerException(final String message, final Throwable cause) {
-        super(message, cause);
-    }
-
-    public NoValidPeerException(final String message) {
-        super(message);
-    }
-
-    public NoValidPeerException(final Throwable cause) {
-        super(cause);
-    }
-}
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
index 85d6c1a..8b68c9e 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -21,7 +21,6 @@ import org.apache.nifi.remote.AbstractTransaction;
 import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.codec.FlowFileCodec;
-import org.apache.nifi.remote.exception.NoContentException;
 import org.apache.nifi.remote.exception.ProtocolException;
 import org.apache.nifi.remote.protocol.RequestType;
 import org.apache.nifi.remote.protocol.Response;
@@ -44,9 +43,6 @@ public class SocketClientTransaction extends 
AbstractTransaction {
         this.dos = new 
DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream());
 
         initialize();
-        if (direction == TransferDirection.RECEIVE && !this.dataAvailable){
-            throw new NoContentException("Remote side has no flowfiles to 
provide");
-        }
     }
 
     private void initialize() throws IOException {
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
index 3270708..249325d 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
@@ -64,7 +64,6 @@ import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.client.http.TransportProtocolVersionNegotiator;
 import org.apache.nifi.remote.exception.HandshakeException;
-import org.apache.nifi.remote.exception.NoContentException;
 import org.apache.nifi.remote.exception.PortNotRunningException;
 import org.apache.nifi.remote.exception.ProtocolException;
 import org.apache.nifi.remote.exception.UnknownPortException;
@@ -148,7 +147,6 @@ public class SiteToSiteRestApiClient implements Closeable {
     private static final int RESPONSE_CODE_OK = 200;
     private static final int RESPONSE_CODE_CREATED = 201;
     private static final int RESPONSE_CODE_ACCEPTED = 202;
-    private static final int RESPONSE_CODE_NO_CONTENT = 204;
     private static final int RESPONSE_CODE_BAD_REQUEST = 400;
     private static final int RESPONSE_CODE_FORBIDDEN = 403;
     private static final int RESPONSE_CODE_NOT_FOUND = 404;
@@ -173,7 +171,7 @@ public class SiteToSiteRestApiClient implements Closeable {
     private int batchCount = 0;
     private long batchSize = 0;
     private long batchDurationMillis = 0;
-    private TransportProtocolVersionNegotiator 
transportProtocolVersionNegotiator = new 
TransportProtocolVersionNegotiator(2,1);
+    private TransportProtocolVersionNegotiator 
transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(1);
 
     private String trustedPeerDn;
     private final ScheduledExecutorService ttlExtendTaskExecutor;
@@ -500,8 +498,6 @@ public class SiteToSiteRestApiClient implements Closeable {
                 }
                 serverTransactionTtl = 
Integer.parseInt(serverTransactionTtlHeader.getValue());
                 break;
-            case RESPONSE_CODE_NO_CONTENT:
-                throw new NoContentException("Server has no flowfiles to 
provide");
 
             default:
                 try (InputStream content = response.getEntity().getContent()) {
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
 
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
index 72dd9a6..d98774e 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
@@ -35,7 +35,6 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -50,6 +49,8 @@ import static java.util.stream.Collectors.groupingBy;
 import static java.util.stream.Collectors.reducing;
 import static java.util.stream.Collectors.toMap;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
@@ -227,13 +228,10 @@ public class TestPeerSelector {
             throw new IOException("Connection refused. " + 
peerFetchStatusesFrom + " is not running.");
         
}).when(peerStatusProvider).fetchRemotePeerStatuses(any(PeerDescription.class));
 
-
-        ArrayList<PeerStatus> peers;
-
         // 1st attempt. It uses the bootstrap node.
         peerSelector.refreshPeers();
-        peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE);
-        assert(!peers.isEmpty());
+        PeerStatus peerStatus = 
peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
+        assertNotNull(peerStatus);
 
         // Proceed time so that peer selector refresh statuses.
         peerStatuses.remove(bootstrapNodeStatus);
@@ -241,35 +239,34 @@ public class TestPeerSelector {
 
         // 2nd attempt.
         peerSelector.refreshPeers();
-        peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE);
-        assert(!peers.isEmpty());
-        assertEquals("Node2 should be returned since node 2 is the only 
available node.", node2, peers.get(0).getPeerDescription());
+        peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
+        assertNotNull(peerStatus);
+        assertEquals("Node2 should be returned since node 2 is the only 
available node.", node2, peerStatus.getPeerDescription());
 
         // Proceed time so that peer selector refresh statuses.
         systemTime.offset += TimeUnit.MILLISECONDS.convert(1, 
TimeUnit.MINUTES) + 1;
 
         // 3rd attempt.
         peerSelector.refreshPeers();
-        peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE);
-        assert(!peers.isEmpty());
-        assertEquals("Node2 should be returned since node 2 is the only 
available node.", node2, peers.get(0).getPeerDescription());
+        peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
+        assertNotNull(peerStatus);
+        assertEquals("Node2 should be returned since node 2 is the only 
available node.", node2, peerStatus.getPeerDescription());
 
         // Remove node2 to simulate that it goes down. There's no available 
node at this point.
         peerStatuses.remove(node2Status);
         systemTime.offset += TimeUnit.MILLISECONDS.convert(1, 
TimeUnit.MINUTES) + 1;
 
         peerSelector.refreshPeers();
-        peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE);
-        assertTrue("PeerSelector should return an empty list as next peer 
statuses, since there's no available peer", peers.isEmpty());
+        peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
+        assertNull("PeerSelector should return null as next peer status, since 
there's no available peer", peerStatus);
 
         // Add node1 back. PeerSelector should be able to fetch peer statuses 
because it always tries to fetch at least from the bootstrap node.
         peerStatuses.add(bootstrapNodeStatus);
         systemTime.offset += TimeUnit.MILLISECONDS.convert(1, 
TimeUnit.MINUTES) + 1;
 
         peerSelector.refreshPeers();
-        peers = peerSelector.getPeerStatuses(TransferDirection.RECEIVE);
-        assert(!peers.isEmpty());
-        assertEquals("Node1 should be returned since node 1 is the only 
available node.", bootstrapNode, peers.get(0).getPeerDescription());
+        peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
+        assertEquals("Node1 should be returned since node 1 is the only 
available node.", bootstrapNode, peerStatus.getPeerDescription());
     }
 
     @Test
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
 
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
index 706b4ca..ded1db1 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
@@ -57,7 +57,6 @@ import org.apache.nifi.remote.client.KeystoreType;
 import org.apache.nifi.remote.client.SiteToSiteClient;
 import org.apache.nifi.remote.codec.StandardFlowFileCodec;
 import org.apache.nifi.remote.exception.HandshakeException;
-import org.apache.nifi.remote.exception.NoValidPeerException;
 import org.apache.nifi.remote.io.CompressionInputStream;
 import org.apache.nifi.remote.io.CompressionOutputStream;
 import org.apache.nifi.remote.protocol.DataPacket;
@@ -198,21 +197,6 @@ public class TestHttpClient {
 
     }
 
-    public static class EmptyPortTransactionsServlet extends 
PortTransactionsServlet {
-
-        @Override
-        protected void doPost(HttpServletRequest req, HttpServletResponse 
resp) throws ServletException, IOException {
-
-            final int reqProtocolVersion = getReqProtocolVersion(req);
-            if (reqProtocolVersion == 1) {
-                super.doPost(req, resp);
-            } else {
-                respondWithText(resp, "No flowfiles available", 204);
-            }
-        }
-
-    }
-
     public static class PortTransactionsAccessDeniedServlet extends 
HttpServlet {
 
         @Override
@@ -513,8 +497,6 @@ public class TestHttpClient {
         
servletHandler.addServletWithMapping(OutputPortTransactionServlet.class, 
"/data-transfer/output-ports/output-timeout-data-ex-id/transactions/transaction-id");
         
servletHandler.addServletWithMapping(FlowFilesTimeoutAfterDataExchangeServlet.class,
 
"/data-transfer/output-ports/output-timeout-data-ex-id/transactions/transaction-id/flow-files");
 
-        
servletHandler.addServletWithMapping(EmptyPortTransactionsServlet.class,"/data-transfer/output-ports/empty-output-running-id/transactions");
-
         server.start();
 
         logger.info("Starting server on port {} for HTTP, and {} for HTTPS", 
httpConnector.getLocalPort(), sslConnector.getLocalPort());
@@ -674,13 +656,6 @@ public class TestHttpClient {
         runningOutputPort.setState(ScheduledState.RUNNING.name());
         outputPorts.add(runningOutputPort);
 
-        final PortDTO emptyRunningOutputPort = new PortDTO();
-        emptyRunningOutputPort.setName("empty-output-running");
-        emptyRunningOutputPort.setId("empty-output-running-id");
-        emptyRunningOutputPort.setType("OUTPUT_PORT");
-        emptyRunningOutputPort.setState(ScheduledState.RUNNING.name());
-        outputPorts.add(emptyRunningOutputPort);
-
         final PortDTO timeoutOutputPort = new PortDTO();
         timeoutOutputPort.setName("output-timeout");
         timeoutOutputPort.setId("output-timeout-id");
@@ -743,10 +718,9 @@ public class TestHttpClient {
                 .build()
         ) {
             final Transaction transaction = 
client.createTransaction(TransferDirection.SEND);
-            fail();
 
-        } catch (final NoValidPeerException e) {
-            assertNotNull(e.getMessage());
+            assertNull(transaction);
+
         }
 
     }
@@ -763,10 +737,9 @@ public class TestHttpClient {
                 .build()
         ) {
             final Transaction transaction = 
client.createTransaction(TransferDirection.SEND);
-            fail();
 
-        } catch (final NoValidPeerException e) {
-            assertNotNull(e.getMessage());
+            assertNull(transaction);
+
         }
 
     }
@@ -782,11 +755,11 @@ public class TestHttpClient {
                 .build()
         ) {
             final Transaction transaction = 
client.createTransaction(TransferDirection.SEND);
-            fail();
 
-        } catch (final NoValidPeerException e) {
-            assertNotNull(e.getMessage());
+            assertNull(transaction);
+
         }
+
     }
 
     @Test
@@ -881,10 +854,7 @@ public class TestHttpClient {
                         .build()
         ) {
             final Transaction transaction = 
client.createTransaction(TransferDirection.SEND);
-            fail();
-
-        } catch (final NoValidPeerException e) {
-            assertNotNull("createTransaction should fail at peer selection and 
return null.", e.getMessage());
+            assertNull("createTransaction should fail at peer selection and 
return null.", transaction);
         }
 
     }
@@ -1254,23 +1224,6 @@ public class TestHttpClient {
         }
     }
 
-    @Test
-    public void testReceiveEmptyPort() throws Exception {
-
-        try (
-                SiteToSiteClient client = getDefaultBuilder()
-                        .portName("empty-output-running")
-                        .build()
-        ) {
-            try {
-                final Transaction transaction = 
client.createTransaction(TransferDirection.RECEIVE);
-                assertNull(transaction);
-            } catch (IOException e) {
-                fail();
-            }
-        }
-    }
-
     private void testReceive(SiteToSiteClient client) throws IOException {
         final Transaction transaction = 
client.createTransaction(TransferDirection.RECEIVE);
 
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java
 
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java
index 048d612..edae052 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java
@@ -23,7 +23,6 @@ import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.codec.FlowFileCodec;
 import org.apache.nifi.remote.codec.StandardFlowFileCodec;
-import org.apache.nifi.remote.exception.NoContentException;
 import org.apache.nifi.remote.io.socket.SocketCommunicationsSession;
 import org.apache.nifi.remote.io.socket.SocketInput;
 import org.apache.nifi.remote.io.socket.SocketOutput;
@@ -45,6 +44,7 @@ import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.createDataPack
 import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveOneFlowFile;
 import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveTwoFlowFiles;
 import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveWithInvalidChecksum;
+import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveZeroFlowFile;
 import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendButDestinationFull;
 import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendOneFlowFile;
 import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendTwoFlowFiles;
@@ -52,7 +52,6 @@ import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendWithIn
 import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendZeroFlowFile;
 import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.readContents;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -93,12 +92,14 @@ public class TestSocketClientTransaction {
         ByteArrayInputStream bis = new 
ByteArrayInputStream(serverResponseBos.toByteArray());
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
 
-        try {
-            SocketClientTransaction transaction = getClientTransaction(bis, 
bos, TransferDirection.RECEIVE);
-            fail();
-        } catch (final NoContentException e) {
-            assertEquals("Remote side has no flowfiles to provide", 
e.getMessage());
-        }
+        SocketClientTransaction transaction = getClientTransaction(bis, bos, 
TransferDirection.RECEIVE);
+
+        execReceiveZeroFlowFile(transaction);
+
+        // Verify what client has sent.
+        DataInputStream sentByClient = new DataInputStream(new 
ByteArrayInputStream(bos.toByteArray()));
+        assertEquals(RequestType.RECEIVE_FLOWFILES, 
RequestType.readRequestType(sentByClient));
+        assertEquals(-1, sentByClient.read());
     }
 
     @Test
diff --git 
a/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
 
b/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
index 278e6b6..83a7e42 100644
--- 
a/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
+++ 
b/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
@@ -147,13 +147,6 @@ public class NiFiReceiver extends Receiver<NiFiDataPacket> 
{
                 try {
                     while (!isStopped()) {
                         final Transaction transaction = 
client.createTransaction(TransferDirection.RECEIVE);
-                        if (transaction == null) {
-                            try {
-                                Thread.sleep(1000L);
-                            } catch (InterruptedException e) {
-                            }
-                            continue;
-                        }
                         DataPacket dataPacket = transaction.receive();
                         if (dataPacket == null) {
                             transaction.confirm();
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 95f5c2e..da050e2 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -36,7 +36,6 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.remote.client.SiteToSiteClient;
 import org.apache.nifi.remote.client.SiteToSiteClientConfig;
-import org.apache.nifi.remote.exception.NoValidPeerException;
 import org.apache.nifi.remote.exception.PortNotRunningException;
 import org.apache.nifi.remote.exception.ProtocolException;
 import org.apache.nifi.remote.exception.UnknownPortException;
@@ -223,13 +222,6 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
         final Transaction transaction;
         try {
             transaction = client.createTransaction(transferDirection);
-        } catch (final NoValidPeerException e) {
-            final String message = String.format("%s Unable to create 
transaction to communicate with; all peers must be penalized, so yielding 
context", this);
-            logger.debug(message);
-            session.rollback();
-            context.yield();
-            remoteGroup.getEventReporter().reportEvent(Severity.ERROR, 
CATEGORY, message);
-            return;
         } catch (final PortNotRunningException e) {
             context.yield();
             this.targetRunning.set(false);
@@ -265,10 +257,11 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
             remoteGroup.getEventReporter().reportEvent(Severity.ERROR, 
CATEGORY, message);
             return;
         }
+
         if (transaction == null) {
+            logger.debug("{} Unable to create transaction to communicate with; 
all peers must be penalized, so yielding context", this);
+            session.rollback();
             context.yield();
-            final String message = String.format("%s successfully connected to 
%s, but it has no flowfiles to provide, yielding", this, url);
-            logger.debug(message);
             return;
         }
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-stateless/src/main/java/org/apache/nifi/stateless/core/StatelessRemoteOutputPort.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-stateless/src/main/java/org/apache/nifi/stateless/core/StatelessRemoteOutputPort.java
index 0543706..cd82fe7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-stateless/src/main/java/org/apache/nifi/stateless/core/StatelessRemoteOutputPort.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-stateless/src/main/java/org/apache/nifi/stateless/core/StatelessRemoteOutputPort.java
@@ -17,7 +17,6 @@
 package org.apache.nifi.stateless.core;
 
 import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.remote.exception.NoValidPeerException;
 import org.apache.nifi.stateless.bootstrap.InMemoryFlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.DataUnit;
@@ -115,8 +114,8 @@ public class StatelessRemoteOutputPort extends 
AbstractStatelessComponent {
         try {
             final Transaction transaction = 
client.createTransaction(TransferDirection.RECEIVE);
             if (transaction == null) {
-                getLogger().debug("No flowfiles to receive");
-                return true;
+                getLogger().error("Unable to create a transaction for Remote 
Process Group {} to pull from port {}", new Object[]{url, name});
+                return false;
             }
 
             final Queue<StatelessFlowFile> destinationQueue = new 
LinkedList<>();
@@ -140,9 +139,6 @@ public class StatelessRemoteOutputPort extends 
AbstractStatelessComponent {
 
             transaction.confirm();
             transaction.complete();
-        } catch (final NoValidPeerException e) {
-            getLogger().error("Unable to create a transaction for Remote 
Process Group {} to pull from port {}", new Object[]{url, name});
-            return false;
         } catch (final Exception e) {
             getLogger().error("Failed to receive FlowFile via site-to-site", 
e);
             return false;
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
index f74d66e..c8787d3 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
@@ -31,7 +31,6 @@ import org.apache.nifi.authorization.PublicPortAuthorizable;
 import org.apache.nifi.authorization.resource.ResourceType;
 import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.authorization.user.NiFiUserUtils;
-import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.remote.HttpRemoteSiteListener;
 import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.PeerDescription;
@@ -78,7 +77,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.List;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 
@@ -92,7 +90,6 @@ import static 
org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERT
 import static 
org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE;
 import static 
org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION;
 import static 
org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION;
-import static 
org.apache.nifi.remote.protocol.http.HttpHeaders.PROTOCOL_VERSION;
 
 /**
  * RESTful endpoint for managing a SiteToSite connection.
@@ -208,21 +205,9 @@ public class DataTransferResource extends 
ApplicationResource {
         final int transportProtocolVersion = 
validationResult.transportProtocolVersion;
 
         try {
-            HttpFlowFileServerProtocol serverProtocol = 
initiateServerProtocol(req, peer, transportProtocolVersion);
-
-            int protocolVersion = 
Integer.parseUnsignedInt(req.getHeader(PROTOCOL_VERSION));
-
-            if ((protocolVersion >= 2) && PORT_TYPE_OUTPUT.equals(portType)) {
-                List<Connection> connectionList = 
serverProtocol.getPort().getIncomingConnections();
-                if (connectionList.stream().allMatch(c -> 
c.getFlowFileQueue().isEmpty())) {
-                    // Transaction could be created, but there is nothing to 
transfer. Just return 200.
-                    logger.debug("Output port has no flowfiles to transfer, 
returning 200");
-                    transactionManager.cancelTransaction(transactionId);
-                    return 
noCache(Response.status(Response.Status.NO_CONTENT)).type(MediaType.TEXT_PLAIN).entity("No
 flowfiles available").build();
-                }
-            }
-
             // Execute handshake.
+            initiateServerProtocol(req, peer, transportProtocolVersion);
+
             TransactionResultEntity entity = new TransactionResultEntity();
             entity.setResponseCode(ResponseCode.PROPERTIES_OK.getCode());
             entity.setMessage("Handshake properties are valid, and port is 
running. A transaction is created:" + transactionId);
diff --git 
a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMain.java
 
b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMain.java
index 00bdaf8..e57dbbd 100644
--- 
a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMain.java
+++ 
b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMain.java
@@ -30,7 +30,6 @@ import org.apache.commons.cli.ParseException;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.client.KeystoreType;
 import org.apache.nifi.remote.client.SiteToSiteClient;
-import org.apache.nifi.remote.exception.NoContentException;
 import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.remote.protocol.http.HttpProxy;
 import org.apache.nifi.util.FormatUtils;
@@ -242,8 +241,6 @@ public class SiteToSiteCliMain {
                 } else {
                     new SiteToSiteReceiver(siteToSiteClient, 
output).receiveFiles();
                 }
-            } catch (final NoContentException e) {
-                System.out.println("Remote port has no flowfiles");
             }
         } catch (Exception e) {
             printUsage(e.getMessage(), options);
diff --git 
a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiver.java
 
b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiver.java
index 82d19d2..88ad8f3 100644
--- 
a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiver.java
+++ 
b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiver.java
@@ -22,7 +22,6 @@ import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransactionCompletion;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.client.SiteToSiteClient;
-import org.apache.nifi.remote.exception.NoContentException;
 import org.apache.nifi.remote.protocol.DataPacket;
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonGenerator;
@@ -46,9 +45,6 @@ public class SiteToSiteReceiver {
 
     public TransactionCompletion receiveFiles() throws IOException {
         Transaction transaction = 
siteToSiteClient.createTransaction(TransferDirection.RECEIVE);
-        if (transaction == null) {
-            throw new NoContentException("Remote side has no flowfiles to 
provide");
-        }
         JsonGenerator jsonGenerator = new 
JsonFactory().createJsonGenerator(output);
         jsonGenerator.writeStartArray();
         DataPacket dataPacket;

Reply via email to