Repository: nifi
Updated Branches:
  refs/heads/master 09840027a -> e23b23561


NIFI-2259: HTTP Site-to-Site can't handle DEST_FULL

HTTP Site-to-Site can't handle TRANSACTION_FINISHED_BUT_DESTINATION_FULL
scenario as expected.

That happens if the remote NiFi's input port destination relationship
becomes full during Site-to-Site client sends data. The data which has
already sent to the remote NiFi has to be committed successfully.
However, the remote NiFi returns 503 as a response of commit HTTP
request. Because it does check port availability.

The port availability check shouldn't be called at commit request, since
the session at source NiFi has already been committed. The remote NiFi
should commit its session as well, and return
TRANSACTION_FINISHED_BUT_DESTINATION_FULL response.

This fix makes a remote NiFi to keep the handshaken properties when it holds
transaction to be committed. Then if a transaction already has
handshaken properties, then use it, instead of doing a handshake process
again.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/aae2d278
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/aae2d278
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/aae2d278

Branch: refs/heads/master
Commit: aae2d2787911938deb271272dd7400357a5e94c5
Parents: 809f042
Author: Koji Kawamura <ijokaruma...@apache.org>
Authored: Mon Jul 25 12:14:12 2016 +0900
Committer: joewitt <joew...@apache.org>
Committed: Tue Aug 2 09:08:00 2016 -0400

----------------------------------------------------------------------
 .../nifi/remote/HttpRemoteSiteListener.java     | 31 +++++++++++++++++---
 .../StandardHttpFlowFileServerProtocol.java     | 19 +++++++++---
 .../nifi/remote/TestHttpRemoteSiteListener.java | 11 ++++---
 .../nifi/web/api/DataTransferResource.java      | 11 -------
 4 files changed, 49 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/aae2d278/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java
index 08fb188..e08a3ac 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java
@@ -18,6 +18,7 @@ package org.apache.nifi.remote;
 
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.remote.protocol.FlowFileTransaction;
+import org.apache.nifi.remote.protocol.HandshakenProperties;
 import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocol;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
@@ -90,10 +91,12 @@ public class HttpRemoteSiteListener implements 
RemoteSiteListener {
 
     private class TransactionWrapper {
         private final FlowFileTransaction transaction;
+        private final HandshakenProperties handshakenProperties;
         private long lastCommunicationAt;
 
-        private TransactionWrapper(final FlowFileTransaction transaction) {
+        private TransactionWrapper(final FlowFileTransaction transaction, 
final HandshakenProperties handshakenProperties) {
             this.transaction = transaction;
+            this.handshakenProperties = handshakenProperties;
             this.lastCommunicationAt = System.currentTimeMillis();
         }
 
@@ -166,13 +169,17 @@ public class HttpRemoteSiteListener implements 
RemoteSiteListener {
 
     public String createTransaction() {
         final String transactionId = UUID.randomUUID().toString();
-        transactions.put(transactionId, new TransactionWrapper(null));
+        transactions.put(transactionId, new TransactionWrapper(null, null));
         logger.debug("Created a new transaction: {}", transactionId);
         return transactionId;
     }
 
     public boolean isTransactionActive(final String transactionId) {
         TransactionWrapper transaction = transactions.get(transactionId);
+        return isTransactionActive(transaction);
+    }
+
+    private boolean isTransactionActive(TransactionWrapper transaction) {
         if (transaction == null) {
             return false;
         }
@@ -182,7 +189,23 @@ public class HttpRemoteSiteListener implements 
RemoteSiteListener {
         return true;
     }
 
-    public void holdTransaction(final String transactionId, final 
FlowFileTransaction transaction) throws IllegalStateException {
+    /**
+     * @param transactionId transactionId to check
+     * @return Returns a HandshakenProperties instance which is created when 
this transaction is started,
+     *          only if the transaction is active,
+     *          and it holds a HandshakenProperties,
+     *          otherwise return null
+     */
+    public HandshakenProperties getHandshakenProperties(final String 
transactionId) {
+        TransactionWrapper transaction = transactions.get(transactionId);
+        if (isTransactionActive(transaction)) {
+            return transaction.handshakenProperties;
+        }
+        return null;
+    }
+
+    public void holdTransaction(final String transactionId, final 
FlowFileTransaction transaction,
+                                final HandshakenProperties 
handshakenProperties) throws IllegalStateException {
         // We don't check expiration of the transaction here, to support large 
file transport or slow network.
         // The availability of current transaction is already checked when the 
HTTP request was received at SiteToSiteResource.
         TransactionWrapper currentTransaction = 
transactions.remove(transactionId);
@@ -197,7 +220,7 @@ public class HttpRemoteSiteListener implements 
RemoteSiteListener {
         logger.debug("Holding a transaction: {}", transactionId);
         // Server has received or sent all data, and transaction TTL count 
down starts here.
         // However, if the client doesn't consume data fast enough, server 
might expire and rollback the transaction.
-        transactions.put(transactionId, new TransactionWrapper(transaction));
+        transactions.put(transactionId, new TransactionWrapper(transaction, 
handshakenProperties));
     }
 
     public FlowFileTransaction finalizeTransaction(final String transactionId) 
throws IllegalStateException {

http://git-wip-us.apache.org/repos/asf/nifi/blob/aae2d278/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java
index ebbee17..660c498 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java
@@ -34,6 +34,7 @@ import org.apache.nifi.remote.protocol.Response;
 import org.apache.nifi.remote.protocol.ResponseCode;
 import org.apache.nifi.stream.io.ByteArrayInputStream;
 import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.util.StringUtils;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -65,11 +66,21 @@ public class StandardHttpFlowFileServerProtocol extends 
AbstractFlowFileServerPr
 
     @Override
     protected HandshakenProperties doHandshake(Peer peer) throws IOException, 
HandshakeException {
-        HandshakenProperties confirmed = new HandshakenProperties();
 
         HttpServerCommunicationsSession commsSession = 
(HttpServerCommunicationsSession) peer.getCommunicationsSession();
-        confirmed.setCommsIdentifier(commsSession.getTransactionId());
-        validateHandshakeRequest(confirmed, peer, 
commsSession.getHandshakeParams());
+        final String transactionId = commsSession.getTransactionId();
+
+        HandshakenProperties confirmed = null;
+        if (!StringUtils.isEmpty(transactionId)) {
+            // If handshake is already done, use it.
+            confirmed = 
transactionManager.getHandshakenProperties(transactionId);
+        }
+        if (confirmed == null) {
+            // If it's not, then do handshake.
+            confirmed = new HandshakenProperties();
+            confirmed.setCommsIdentifier(transactionId);
+            validateHandshakeRequest(confirmed, peer, 
commsSession.getHandshakeParams());
+        }
 
         logger.debug("{} Done handshake, confirmed={}", this, confirmed);
         return confirmed;
@@ -168,7 +179,7 @@ public class StandardHttpFlowFileServerProtocol extends 
AbstractFlowFileServerPr
         HttpServerCommunicationsSession commSession = 
(HttpServerCommunicationsSession) peer.getCommunicationsSession();
         String transactionId = commSession.getTransactionId();
         logger.debug("{} Holding transaction. transactionId={}", this, 
transactionId);
-        transactionManager.holdTransaction(transactionId, transaction);
+        transactionManager.holdTransaction(transactionId, transaction, 
handshakenProperties);
 
         return transaction.getFlowFilesSent().size();
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/aae2d278/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestHttpRemoteSiteListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestHttpRemoteSiteListener.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestHttpRemoteSiteListener.java
index ded2042..6ab8988 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestHttpRemoteSiteListener.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestHttpRemoteSiteListener.java
@@ -18,6 +18,7 @@ package org.apache.nifi.remote;
 
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.remote.protocol.FlowFileTransaction;
+import org.apache.nifi.remote.protocol.HandshakenProperties;
 import org.apache.nifi.util.NiFiProperties;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -45,7 +46,9 @@ public class TestHttpRemoteSiteListener {
 
         ProcessSession processSession = Mockito.mock(ProcessSession.class);
         FlowFileTransaction transaction = new 
FlowFileTransaction(processSession, null, null, 0, null, null);
-        transactionManager.holdTransaction(transactionId, transaction);
+        transactionManager.holdTransaction(transactionId, transaction, new 
HandshakenProperties());
+
+        
assertNotNull(transactionManager.getHandshakenProperties(transactionId));
 
         transaction = transactionManager.finalizeTransaction(transactionId);
         assertNotNull(transaction);
@@ -63,10 +66,10 @@ public class TestHttpRemoteSiteListener {
 
         ProcessSession processSession = Mockito.mock(ProcessSession.class);
         FlowFileTransaction transaction = new 
FlowFileTransaction(processSession, null, null, 0, null, null);
-        transactionManager.holdTransaction(transactionId, transaction);
+        transactionManager.holdTransaction(transactionId, transaction, null);
 
         try {
-            transactionManager.holdTransaction(transactionId, transaction);
+            transactionManager.holdTransaction(transactionId, transaction, 
null);
             fail("The same transaction id can't hold another transaction");
         } catch (IllegalStateException e) {
         }
@@ -83,7 +86,7 @@ public class TestHttpRemoteSiteListener {
         ProcessSession processSession = Mockito.mock(ProcessSession.class);
         FlowFileTransaction transaction = new 
FlowFileTransaction(processSession, null, null, 0, null, null);
         try {
-            transactionManager.holdTransaction(transactionId, transaction);
+            transactionManager.holdTransaction(transactionId, transaction, 
null);
         } catch (IllegalStateException e) {
             fail("Transaction can be held even if the transaction id is not 
valid anymore," +
                     " in order to support large file or slow network.");

http://git-wip-us.apache.org/repos/asf/nifi/blob/aae2d278/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
index 3034e1e..e2de588 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
@@ -316,8 +316,6 @@ public class DataTransferResource extends 
ApplicationResource {
 
         HttpFlowFileServerProtocol serverProtocol = 
getHttpFlowFileServerProtocol(versionNegotiator);
         
HttpRemoteSiteListener.getInstance().setupServerProtocol(serverProtocol);
-        // TODO: How should I pass cluster information?
-        // serverProtocol.setNodeInformant(clusterManager);
         serverProtocol.handshake(peer);
         return serverProtocol;
     }
@@ -833,15 +831,6 @@ public class DataTransferResource extends 
ApplicationResource {
             return result;
         }
 
-        // TODO: NCM no longer exists.
-        /*
-        if (properties.isClusterManager()) {
-            result.errResponse = 
responseCreator.nodeTypeErrorResponse(req.getPathInfo() + " is not available on 
a NiFi Cluster Manager.");
-            return result;
-        }
-        */
-
-
         try {
             result.transportProtocolVersion = 
negotiateTransportProtocolVersion(req, transportProtocolVersionNegotiator);
         } catch (BadRequestException e) {

Reply via email to