NIFI-2028: Fixed Site-to-Site Transit URI

Fixed Site-to-Site Transit URI for HTTP to be consistent with RAW socket.

- Removed url from CommunicationsSession since it's redundant as we have
  Peer.url, too. The value was not used from anywhere other than HTTP
Site-to-Site.
- Added createTransitUri method in Communicant interface, so that
  implementation can customize transitUri while providing consistent
interface.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/809f0423
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/809f0423
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/809f0423

Branch: refs/heads/master
Commit: 809f042353ce0a23c25d864db810a7dc64fbcfa0
Parents: 0984002
Author: Koji Kawamura <ijokaruma...@apache.org>
Authored: Mon Jul 25 11:36:23 2016 +0900
Committer: joewitt <joew...@apache.org>
Committed: Tue Aug 2 09:08:00 2016 -0400

----------------------------------------------------------------------
 .../remote/AbstractCommunicationsSession.java   |  27 +-
 .../org/apache/nifi/remote/Communicant.java     |   6 +
 .../main/java/org/apache/nifi/remote/Peer.java  |   5 +
 .../nifi/remote/client/http/HttpClient.java     |   1 -
 .../client/socket/EndpointConnectionPool.java   |   6 +-
 .../io/http/HttpCommunicationsSession.java      |  13 +-
 .../SocketChannelCommunicationsSession.java     |   4 +-
 .../SSLSocketChannelCommunicationsSession.java  |   4 +-
 .../remote/protocol/CommunicationsSession.java  |  11 +-
 .../protocol/http/HttpClientTransaction.java    |   4 +-
 .../remote/util/SiteToSiteRestApiClient.java    |  13 +-
 .../http/TestHttpClientTransaction.java         |  18 +-
 .../nifi/remote/SocketRemoteSiteListener.java   |   4 +-
 .../nifi/remote/StandardRemoteGroupPort.java    |  12 +-
 .../AbstractFlowFileServerProtocol.java         |  10 +-
 .../StandardHttpFlowFileServerProtocol.java     |   3 +-
 .../socket/SocketFlowFileServerProtocol.java    |   6 +
 .../remote/TestStandardRemoteGroupPort.java     | 256 +++++++++++++++++++
 .../http/TestHttpFlowFileServerProtocol.java    |  36 ++-
 .../nifi/web/api/DataTransferResource.java      |  28 +-
 .../apache/nifi/web/api/SiteToSiteResource.java |  18 +-
 .../nifi/web/api/TestDataTransferResource.java  |   3 +
 22 files changed, 404 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/809f0423/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
index dacfd64..8ecdb52 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
@@ -22,27 +22,6 @@ public abstract class AbstractCommunicationsSession 
implements CommunicationsSes
 
     private String userDn;
 
-    private volatile String uri;
-
-    public AbstractCommunicationsSession(final String uri) {
-        this.uri = uri;
-    }
-
-    @Override
-    public String toString() {
-        return uri;
-    }
-
-    @Override
-    public void setUri(final String uri) {
-        this.uri = uri;
-    }
-
-    @Override
-    public String getUri() {
-        return uri;
-    }
-
     @Override
     public String getUserDn() {
         return userDn;
@@ -52,4 +31,10 @@ public abstract class AbstractCommunicationsSession 
implements CommunicationsSes
     public void setUserDn(final String dn) {
         this.userDn = dn;
     }
+
+    @Override
+    public String createTransitUri(String communicantUrl, String 
sourceFlowFileIdentifier) {
+        return communicantUrl + (communicantUrl.endsWith("/") ? "" : "/") + 
sourceFlowFileIdentifier;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/809f0423/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java
index 6245a53..39fac9e 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java
@@ -43,4 +43,10 @@ public interface Communicant {
      * if the Distinguished Name is unknown
      */
     String getDistinguishedName();
+
+    /**
+     * @return When data is transferred via Site-to-Site, provenance events 
are generated.
+     * This method returns a transit url used for the provenance event.
+     */
+    String createTransitUri(final String sourceFlowFileIdentifier);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/809f0423/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
index 5cb37b0..962baec 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
@@ -148,4 +148,9 @@ public class Peer implements Communicant {
     public String getDistinguishedName() {
         return commsSession.getUserDn();
     }
+
+    @Override
+    public String createTransitUri(String sourceFlowFileIdentifier) {
+        return commsSession.createTransitUri(url, sourceFlowFileIdentifier);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/809f0423/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
index 4cc794b..3c92acd 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
@@ -126,7 +126,6 @@ public class HttpClient extends AbstractSiteToSiteClient 
implements PeerStatusPr
 
             final CommunicationsSession commSession = new 
HttpCommunicationsSession();
             final String nodeApiUrl = 
resolveNodeApiUrl(peerStatus.getPeerDescription());
-            commSession.setUri(nodeApiUrl);
             final String clusterUrl = config.getUrl();
             final Peer peer = new Peer(peerStatus.getPeerDescription(), 
commSession, nodeApiUrl, clusterUrl);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/809f0423/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index 6869e4b..f90aed9 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -465,7 +465,6 @@ public class EndpointConnectionPool implements 
PeerStatusProvider {
 
     private CommunicationsSession establishSiteToSiteConnection(final String 
hostname, final int port) throws IOException {
         final boolean siteToSiteSecure = siteInfoProvider.isSecure();
-        final String destinationUri = "nifi://" + hostname + ":" + port;
 
         CommunicationsSession commsSession = null;
         try {
@@ -478,7 +477,7 @@ public class EndpointConnectionPool implements 
PeerStatusProvider {
                 final SSLSocketChannel socketChannel = new 
SSLSocketChannel(sslContext, hostname, port, true);
                 socketChannel.connect();
 
-                commsSession = new 
SSLSocketChannelCommunicationsSession(socketChannel, destinationUri);
+                commsSession = new 
SSLSocketChannelCommunicationsSession(socketChannel);
 
                 try {
                     commsSession.setUserDn(socketChannel.getDn());
@@ -490,11 +489,10 @@ public class EndpointConnectionPool implements 
PeerStatusProvider {
                 socketChannel.socket().connect(new InetSocketAddress(hostname, 
port), commsTimeout);
                 socketChannel.socket().setSoTimeout(commsTimeout);
 
-                commsSession = new 
SocketChannelCommunicationsSession(socketChannel, destinationUri);
+                commsSession = new 
SocketChannelCommunicationsSession(socketChannel);
             }
 
             
commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
-            commsSession.setUri(destinationUri);
         } catch (final IOException ioe) {
             if (commsSession != null) {
                 commsSession.close();

http://git-wip-us.apache.org/repos/asf/nifi/blob/809f0423/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpCommunicationsSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpCommunicationsSession.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpCommunicationsSession.java
index d561833..868fb36 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpCommunicationsSession.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpCommunicationsSession.java
@@ -29,9 +29,10 @@ public class HttpCommunicationsSession extends 
AbstractCommunicationsSession {
     protected final HttpInput input;
     protected final HttpOutput output;
     protected String checksum;
+    private String dataTransferUrl;
 
     public HttpCommunicationsSession(){
-        super(null);
+        super();
         this.input = new HttpInput();
         this.output = new HttpOutput();
     }
@@ -93,5 +94,15 @@ public class HttpCommunicationsSession extends 
AbstractCommunicationsSession {
         this.checksum = checksum;
     }
 
+    /**
+     * @param dataTransferUrl Set data transfer url to use as provenance event 
transit url.
+     */
+    public void setDataTransferUrl(String dataTransferUrl) {
+        this.dataTransferUrl = dataTransferUrl;
+    }
 
+    @Override
+    public String createTransitUri(String communicantUrl, String 
sourceFlowFileIdentifier) {
+        return dataTransferUrl;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/809f0423/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
index 6180c3c..e19cd3d 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
@@ -28,8 +28,8 @@ public class SocketChannelCommunicationsSession extends 
AbstractCommunicationsSe
     private final SocketChannelOutput response;
     private int timeout = 30000;
 
-    public SocketChannelCommunicationsSession(final SocketChannel 
socketChannel, final String uri) throws IOException {
-        super(uri);
+    public SocketChannelCommunicationsSession(final SocketChannel 
socketChannel) throws IOException {
+        super();
         request = new SocketChannelInput(socketChannel);
         response = new SocketChannelOutput(socketChannel);
         channel = socketChannel;

http://git-wip-us.apache.org/repos/asf/nifi/blob/809f0423/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
index 5e5abc7..73f5a90 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
@@ -26,8 +26,8 @@ public class SSLSocketChannelCommunicationsSession extends 
AbstractCommunication
     private final SSLSocketChannelInput request;
     private final SSLSocketChannelOutput response;
 
-    public SSLSocketChannelCommunicationsSession(final SSLSocketChannel 
channel, final String uri) {
-        super(uri);
+    public SSLSocketChannelCommunicationsSession(final SSLSocketChannel 
channel) {
+        super();
         request = new SSLSocketChannelInput(channel);
         response = new SSLSocketChannelOutput(channel);
         this.channel = channel;

http://git-wip-us.apache.org/repos/asf/nifi/blob/809f0423/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
index aff73ba..4df12ae 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
@@ -31,10 +31,6 @@ public interface CommunicationsSession extends Closeable {
 
     int getTimeout() throws IOException;
 
-    void setUri(String uri);
-
-    String getUri();
-
     String getUserDn();
 
     void setUserDn(String dn);
@@ -59,4 +55,11 @@ public interface CommunicationsSession extends Closeable {
      * otherwise
      */
     boolean isClosed();
+
+    /**
+     * @param communicantUrl Communicant's url that this session is assigned 
to.
+     * @param sourceFlowFileIdentifier Source Flow-file's uuid.
+     * @return A transit uri to be used in a provenance event.
+     */
+    String createTransitUri(final String communicantUrl, final String 
sourceFlowFileIdentifier);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/809f0423/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpClientTransaction.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpClientTransaction.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpClientTransaction.java
index d4085ca..693b3c7 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpClientTransaction.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpClientTransaction.java
@@ -48,9 +48,9 @@ public class HttpClientTransaction extends 
AbstractTransaction {
         this.transactionUrl = transactionUrl;
         this.apiClient = apiUtil;
         if(TransferDirection.RECEIVE.equals(direction)){
-            dataAvailable = apiUtil.openConnectionForReceive(transactionUrl, 
peer.getCommunicationsSession());
+            dataAvailable = apiUtil.openConnectionForReceive(transactionUrl, 
peer);
         } else {
-            apiUtil.openConnectionForSend(transactionUrl, 
peer.getCommunicationsSession());
+            apiUtil.openConnectionForSend(transactionUrl, peer);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/809f0423/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
index e81dc5b..8a379b7 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
@@ -54,6 +54,7 @@ import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
 import org.apache.http.protocol.HttpContext;
 import org.apache.http.protocol.HttpCoreContext;
 import org.apache.http.util.EntityUtils;
+import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.client.http.TransportProtocolVersionNegotiator;
 import org.apache.nifi.remote.exception.PortNotRunningException;
@@ -374,9 +375,12 @@ public class SiteToSiteRestApiClient implements Closeable {
 
     }
 
-    public boolean openConnectionForReceive(final String transactionUrl, final 
CommunicationsSession commSession) throws IOException {
+    public boolean openConnectionForReceive(final String transactionUrl, final 
Peer peer) throws IOException {
 
         final HttpGet get = createGet(transactionUrl + "/flow-files");
+        // Set uri so that it'll be used as transit uri.
+        
((HttpCommunicationsSession)peer.getCommunicationsSession()).setDataTransferUrl(get.getURI().toString());
+
         get.setHeader(HttpHeaders.PROTOCOL_VERSION, 
String.valueOf(transportProtocolVersionNegotiator.getVersion()));
 
         setHandshakeProperties(get);
@@ -414,7 +418,7 @@ public class SiteToSiteRestApiClient implements Closeable {
                             return r;
                         }
                     };
-                    ((HttpInput) 
commSession.getInput()).setInputStream(streamCapture);
+                    ((HttpInput) 
peer.getCommunicationsSession().getInput()).setInputStream(streamCapture);
 
                     startExtendingTtl(transactionUrl, httpIn, response);
                     keepItOpen = true;
@@ -436,10 +440,13 @@ public class SiteToSiteRestApiClient implements Closeable 
{
     private Future<HttpResponse> postResult;
     private CountDownLatch transferDataLatch = new CountDownLatch(1);
 
-    public void openConnectionForSend(final String transactionUrl, final 
CommunicationsSession commSession) throws IOException {
+    public void openConnectionForSend(final String transactionUrl, final Peer 
peer) throws IOException {
 
+        final CommunicationsSession commSession = 
peer.getCommunicationsSession();
         final String flowFilesPath = transactionUrl + "/flow-files";
         final HttpPost post = createPost(flowFilesPath);
+        // Set uri so that it'll be used as transit uri.
+        
((HttpCommunicationsSession)peer.getCommunicationsSession()).setDataTransferUrl(post.getURI().toString());
 
         post.setHeader("Content-Type", "application/octet-stream");
         post.setHeader("Accept", "text/plain");

http://git-wip-us.apache.org/repos/asf/nifi/blob/809f0423/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpClientTransaction.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpClientTransaction.java
 
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpClientTransaction.java
index 7f3ee5c..b9ab25c 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpClientTransaction.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpClientTransaction.java
@@ -101,7 +101,7 @@ public class TestHttpClientTransaction {
 
         SiteToSiteRestApiClient apiClient = 
mock(SiteToSiteRestApiClient.class);
         final String transactionUrl = 
"http://www.example.com/data-transfer/input-ports/portId/transactions/transactionId";;
-        
doReturn(false).when(apiClient).openConnectionForReceive(eq(transactionUrl), 
any(CommunicationsSession.class));
+        
doReturn(false).when(apiClient).openConnectionForReceive(eq(transactionUrl), 
any(Peer.class));
 
         ByteArrayInputStream serverResponse = new ByteArrayInputStream(new 
byte[0]);
         ByteArrayOutputStream clientRequest = new ByteArrayOutputStream();
@@ -117,7 +117,7 @@ public class TestHttpClientTransaction {
 
         SiteToSiteRestApiClient apiClient = 
mock(SiteToSiteRestApiClient.class);
         final String transactionUrl = 
"http://www.example.com/data-transfer/input-ports/portId/transactions/transactionId";;
-        
doReturn(true).when(apiClient).openConnectionForReceive(eq(transactionUrl), 
any(CommunicationsSession.class));
+        
doReturn(true).when(apiClient).openConnectionForReceive(eq(transactionUrl), 
any(Peer.class));
         TransactionResultEntity resultEntity = new TransactionResultEntity();
         resultEntity.setResponseCode(CONFIRM_TRANSACTION.getCode());
         
doReturn(resultEntity).when(apiClient).commitReceivingFlowFiles(eq(transactionUrl),
 eq(CONFIRM_TRANSACTION), eq("3680976076"));
@@ -139,7 +139,7 @@ public class TestHttpClientTransaction {
 
         SiteToSiteRestApiClient apiClient = 
mock(SiteToSiteRestApiClient.class);
         final String transactionUrl = 
"http://www.example.com/data-transfer/input-ports/portId/transactions/transactionId";;
-        
doReturn(true).when(apiClient).openConnectionForReceive(eq(transactionUrl), 
any(CommunicationsSession.class));
+        
doReturn(true).when(apiClient).openConnectionForReceive(eq(transactionUrl), 
any(Peer.class));
         TransactionResultEntity resultEntity = new TransactionResultEntity();
         resultEntity.setResponseCode(CONFIRM_TRANSACTION.getCode());
         
doReturn(resultEntity).when(apiClient).commitReceivingFlowFiles(eq(transactionUrl),
 eq(CONFIRM_TRANSACTION), eq("2969091230"));
@@ -162,7 +162,7 @@ public class TestHttpClientTransaction {
 
         SiteToSiteRestApiClient apiClient = 
mock(SiteToSiteRestApiClient.class);
         final String transactionUrl = 
"http://www.example.com/data-transfer/input-ports/portId/transactions/transactionId";;
-        
doReturn(true).when(apiClient).openConnectionForReceive(eq(transactionUrl), 
any(CommunicationsSession.class));
+        
doReturn(true).when(apiClient).openConnectionForReceive(eq(transactionUrl), 
any(Peer.class));
         // The checksum is correct, but here we simulate as if it's wrong, 
BAD_CHECKSUM.
         TransactionResultEntity resultEntity = new TransactionResultEntity();
         resultEntity.setResponseCode(ResponseCode.BAD_CHECKSUM.getCode());
@@ -186,7 +186,7 @@ public class TestHttpClientTransaction {
 
         SiteToSiteRestApiClient apiClient = 
mock(SiteToSiteRestApiClient.class);
         final String transactionUrl = 
"http://www.example.com/data-transfer/input-ports/portId/transactions/transactionId";;
-        doNothing().when(apiClient).openConnectionForSend(eq(transactionUrl), 
any(CommunicationsSession.class));
+        doNothing().when(apiClient).openConnectionForSend(eq(transactionUrl), 
any(Peer.class));
 
         ByteArrayOutputStream serverResponseBos = new ByteArrayOutputStream();
         ByteArrayInputStream serverResponse = new 
ByteArrayInputStream(serverResponseBos.toByteArray());
@@ -203,7 +203,7 @@ public class TestHttpClientTransaction {
 
         SiteToSiteRestApiClient apiClient = 
mock(SiteToSiteRestApiClient.class);
         final String transactionUrl = 
"http://www.example.com/data-transfer/input-ports/portId/transactions/transactionId";;
-        doNothing().when(apiClient).openConnectionForSend(eq(transactionUrl), 
any(CommunicationsSession.class));
+        doNothing().when(apiClient).openConnectionForSend(eq(transactionUrl), 
any(Peer.class));
         // Emulate that server returns correct checksum.
         doAnswer(new Answer() {
             @Override
@@ -237,7 +237,7 @@ public class TestHttpClientTransaction {
 
         SiteToSiteRestApiClient apiClient = 
mock(SiteToSiteRestApiClient.class);
         final String transactionUrl = 
"http://www.example.com/data-transfer/input-ports/portId/transactions/transactionId";;
-        doNothing().when(apiClient).openConnectionForSend(eq("portId"), 
any(CommunicationsSession.class));
+        doNothing().when(apiClient).openConnectionForSend(eq("portId"), 
any(Peer.class));
         // Emulate that server returns correct checksum.
         doAnswer(new Answer() {
             @Override
@@ -272,7 +272,7 @@ public class TestHttpClientTransaction {
     public void testSendWithInvalidChecksum() throws IOException {
         SiteToSiteRestApiClient apiClient = 
mock(SiteToSiteRestApiClient.class);
         final String transactionUrl = 
"http://www.example.com/data-transfer/input-ports/portId/transactions/transactionId";;
-        doNothing().when(apiClient).openConnectionForSend(eq(transactionUrl), 
any(CommunicationsSession.class));
+        doNothing().when(apiClient).openConnectionForSend(eq(transactionUrl), 
any(Peer.class));
         // Emulate that server returns incorrect checksum.
         doAnswer(new Answer() {
             @Override
@@ -313,7 +313,7 @@ public class TestHttpClientTransaction {
 
         SiteToSiteRestApiClient apiClient = 
mock(SiteToSiteRestApiClient.class);
         final String transactionUrl = 
"http://www.example.com/data-transfer/input-ports/portId/transactions/transactionId";;
-        doNothing().when(apiClient).openConnectionForSend(eq("portId"), 
any(CommunicationsSession.class));
+        doNothing().when(apiClient).openConnectionForSend(eq("portId"), 
any(Peer.class));
         // Emulate that server returns correct checksum.
         doAnswer(new Answer() {
             @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/809f0423/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
index 814d0e6..bd9d204 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
@@ -154,12 +154,12 @@ public class SocketRemoteSiteListener implements 
RemoteSiteListener {
                                     sslSocketChannel.connect();
                                     LOG.trace("Channel connected");
 
-                                    commsSession = new 
SSLSocketChannelCommunicationsSession(sslSocketChannel, peerUri);
+                                    commsSession = new 
SSLSocketChannelCommunicationsSession(sslSocketChannel);
                                     dn = sslSocketChannel.getDn();
                                     commsSession.setUserDn(dn);
                                 } else {
                                     LOG.trace("{} Channel is not secure", 
this);
-                                    commsSession = new 
SocketChannelCommunicationsSession(socketChannel, peerUri);
+                                    commsSession = new 
SocketChannelCommunicationsSession(socketChannel);
                                     dn = null;
                                 }
                             } catch (final Exception e) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/809f0423/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 3f59b50..8f115f7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -76,6 +76,9 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
     private final TransferDirection transferDirection;
 
     private final AtomicReference<SiteToSiteClient> clientRef = new 
AtomicReference<>();
+    SiteToSiteClient getSiteToSiteClient() {
+        return clientRef.get();
+    }
 
     public StandardRemoteGroupPort(final String id, final String name, final 
ProcessGroup processGroup, final RemoteProcessGroup remoteGroup,
             final TransferDirection direction, final ConnectableType type, 
final SSLContext sslContext, final ProcessScheduler scheduler) {
@@ -118,7 +121,7 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
     public void shutdown() {
         super.shutdown();
 
-        final SiteToSiteClient client = clientRef.get();
+        final SiteToSiteClient client = getSiteToSiteClient();
         if (client != null) {
             try {
                 client.close();
@@ -175,7 +178,7 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
             firstFlowFile = null;
         }
 
-        final SiteToSiteClient client = clientRef.get();
+        final SiteToSiteClient client = getSiteToSiteClient();
         final Transaction transaction;
         try {
             transaction = client.createTransaction(transferDirection);
@@ -275,7 +278,7 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
                 bytesSent += flowFile.getSize();
                 logger.debug("{} Sent {} to {}", this, flowFile, 
transaction.getCommunicant().getUrl());
 
-                final String transitUri = 
transaction.getCommunicant().getUrl() + "/" + 
flowFile.getAttribute(CoreAttributes.UUID.key());
+                final String transitUri = 
transaction.getCommunicant().createTransitUri(flowFile.getAttribute(CoreAttributes.UUID.key()));
                 session.getProvenanceReporter().send(flowFile, transitUri, 
"Remote DN=" + userDn, transferMillis, false);
                 session.remove(flowFile);
 
@@ -331,13 +334,14 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
             flowFile = session.putAllAttributes(flowFile, 
dataPacket.getAttributes());
             flowFile = session.importFrom(dataPacket.getData(), flowFile);
             final long receiveNanos = System.nanoTime() - start;
+            flowFilesReceived.add(flowFile);
 
             String sourceFlowFileIdentifier = 
dataPacket.getAttributes().get(CoreAttributes.UUID.key());
             if (sourceFlowFileIdentifier == null) {
                 sourceFlowFileIdentifier = "<Unknown Identifier>";
             }
 
-            final String transitUri = transaction.getCommunicant().getUrl() + 
sourceFlowFileIdentifier;
+            final String transitUri = 
transaction.getCommunicant().createTransitUri(sourceFlowFileIdentifier);
             session.getProvenanceReporter().receive(flowFile, transitUri, 
"urn:nifi:" + sourceFlowFileIdentifier,
                     "Remote DN=" + userDn, 
TimeUnit.NANOSECONDS.toMillis(receiveNanos));
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/809f0423/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
index 43428e0..8e7d2c5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
@@ -271,8 +271,7 @@ public abstract class AbstractFlowFileServerProtocol 
implements ServerProtocol {
             flowFilesSent.add(flowFile);
             bytesSent += flowFile.getSize();
 
-            String transitUriPrefix = 
handshakenProperties.getTransitUriPrefix();
-            final String transitUri = (transitUriPrefix == null) ? 
peer.getUrl() : transitUriPrefix + 
flowFile.getAttribute(CoreAttributes.UUID.key());
+            final String transitUri = createTransitUri(peer, 
flowFile.getAttribute(CoreAttributes.UUID.key()));
             session.getProvenanceReporter().send(flowFile, transitUri, "Remote 
Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transmissionMillis, false);
             session.remove(flowFile);
 
@@ -319,6 +318,10 @@ public abstract class AbstractFlowFileServerProtocol 
implements ServerProtocol {
 
     }
 
+    protected String createTransitUri(Peer peer, String 
sourceFlowFileIdentifier) {
+        return peer.createTransitUri(sourceFlowFileIdentifier);
+    }
+
     protected int commitTransferTransaction(Peer peer, FlowFileTransaction 
transaction) throws IOException {
         ProcessSession session = transaction.getSession();
         Set<FlowFile> flowFilesSent = transaction.getFlowFilesSent();
@@ -446,8 +449,7 @@ public abstract class AbstractFlowFileServerProtocol 
implements ServerProtocol {
             final String sourceSystemFlowFileUuid = 
dataPacket.getAttributes().get(CoreAttributes.UUID.key());
             flowFile = session.putAttribute(flowFile, 
CoreAttributes.UUID.key(), UUID.randomUUID().toString());
 
-            String transitUriPrefix = 
handshakenProperties.getTransitUriPrefix();
-            final String transitUri = (transitUriPrefix == null) ? 
peer.getUrl() : transitUriPrefix + sourceSystemFlowFileUuid;
+            final String transitUri = createTransitUri(peer, 
sourceSystemFlowFileUuid);
             session.getProvenanceReporter().receive(flowFile, transitUri, 
sourceSystemFlowFileUuid == null
                     ? null : "urn:nifi:" + sourceSystemFlowFileUuid, "Remote 
Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transferMillis);
             session.transfer(flowFile, Relationship.ANONYMOUS);

http://git-wip-us.apache.org/repos/asf/nifi/blob/809f0423/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java
index c4f1f5c..ebbee17 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java
@@ -48,7 +48,7 @@ public class StandardHttpFlowFileServerProtocol extends 
AbstractFlowFileServerPr
     private final VersionNegotiator versionNegotiator;
     private final HttpRemoteSiteListener transactionManager = 
HttpRemoteSiteListener.getInstance();
 
-    public StandardHttpFlowFileServerProtocol(VersionNegotiator 
versionNegotiator) {
+    public StandardHttpFlowFileServerProtocol(final VersionNegotiator 
versionNegotiator) {
         super();
         this.versionNegotiator = versionNegotiator;
     }
@@ -222,4 +222,5 @@ public class StandardHttpFlowFileServerProtocol extends 
AbstractFlowFileServerPr
     public String getResourceName() {
         return RESOURCE_NAME;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/809f0423/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
index fe7d163..6e4b860 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
@@ -217,4 +217,10 @@ public class SocketFlowFileServerProtocol extends 
AbstractFlowFileServerProtocol
     public VersionNegotiator getVersionNegotiator() {
         return versionNegotiator;
     }
+
+    @Override
+    protected String createTransitUri(Peer peer, String 
sourceFlowFileIdentifier) {
+        String transitUriPrefix = handshakenProperties.getTransitUriPrefix();
+        return (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + 
sourceFlowFileIdentifier;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/809f0423/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
new file mode 100644
index 0000000..4209c93
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.provenance.ProvenanceReporter;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.io.http.HttpCommunicationsSession;
+import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
+import org.apache.nifi.remote.util.StandardDataPacket;
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+public class TestStandardRemoteGroupPort {
+
+    private static final String ID = "remote-group-port-id";
+    private static final String NAME = "remote-group-port-name";
+
+    private RemoteProcessGroup remoteGroup;
+    private ProcessScheduler scheduler;
+    private SiteToSiteClient siteToSiteClient;
+    private Transaction transaction;
+    private EventReporter eventReporter;
+    private ProcessGroup processGroup;
+    public static final String REMOTE_CLUSTER_URL = 
"http://node0.example.com:8080/nifi";;
+    private StandardRemoteGroupPort port;
+    private ProcessContext context;
+    private ProcessSession session;
+    private ProvenanceReporter provenanceReporter;
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", 
"DEBUG");
+    }
+
+    private void setupMock(final SiteToSiteTransportProtocol protocol,
+                           final TransferDirection direction) throws Exception 
{
+        setupMock(protocol, direction, mock(Transaction.class));
+    }
+
+    private void setupMock(final SiteToSiteTransportProtocol protocol,
+                          final TransferDirection direction,
+                          final Transaction transaction) throws Exception {
+        processGroup = null;
+        remoteGroup = mock(RemoteProcessGroup.class);
+        scheduler = null;
+        siteToSiteClient = mock(SiteToSiteClient.class);
+        this.transaction = transaction;
+
+        eventReporter = mock(EventReporter.class);
+
+        final ConnectableType connectableType;
+        switch (direction) {
+            case SEND:
+                connectableType = ConnectableType.REMOTE_INPUT_PORT;
+                break;
+            case RECEIVE:
+                connectableType = ConnectableType.OUTPUT_PORT;
+                break;
+            default:
+                connectableType = null;
+                break;
+        }
+        port = spy(new StandardRemoteGroupPort(ID, NAME,
+                processGroup, remoteGroup, direction, connectableType, null, 
scheduler));
+
+        doReturn(true).when(remoteGroup).isTransmitting();
+        doReturn(protocol).when(remoteGroup).getTransportProtocol();
+        doReturn(new URI(REMOTE_CLUSTER_URL)).when(remoteGroup).getTargetUri();
+        doReturn(siteToSiteClient).when(port).getSiteToSiteClient();
+        
doReturn(transaction).when(siteToSiteClient).createTransaction(eq(direction));
+        doReturn(eventReporter).when(remoteGroup).getEventReporter();
+
+        context = null;
+        session = mock(ProcessSession.class);
+        provenanceReporter = mock(ProvenanceReporter.class);
+        doReturn(provenanceReporter).when(session).getProvenanceReporter();
+
+    }
+
+    @Test
+    public void testSendRaw() throws Exception {
+
+        setupMock(SiteToSiteTransportProtocol.RAW, TransferDirection.SEND);
+
+        final String peerUrl = "nifi://node1.example.com:9090";
+        final PeerDescription peerDescription = new 
PeerDescription("node1.example.com", 9090, false);
+        try (final SocketChannel socketChannel = SocketChannel.open()) {
+            final CommunicationsSession commsSession = new 
SocketChannelCommunicationsSession(socketChannel);
+            final Peer peer = new Peer(peerDescription, commsSession, peerUrl, 
REMOTE_CLUSTER_URL);
+
+            doReturn(peer).when(transaction).getCommunicant();
+
+            final QueueSize queueSize = new QueueSize(1, 10);
+            final FlowFile flowFile = mock(FlowFile.class);
+
+            doReturn(queueSize).when(session).getQueueSize();
+            // Return null when it gets called second time.
+            doReturn(flowFile).doReturn(null).when(session).get();
+
+            final String flowFileUuid = "flowfile-uuid";
+            
doReturn(flowFileUuid).when(flowFile).getAttribute(eq(CoreAttributes.UUID.key()));
+
+            port.onTrigger(context, session);
+
+            // Transit uri can be customized if necessary.
+            verify(provenanceReporter).send(eq(flowFile), eq(peerUrl + "/" + 
flowFileUuid), any(String.class),
+                    any(Long.class), eq(false));
+        }
+    }
+
+    @Test
+    public void testReceiveRaw() throws Exception {
+
+        setupMock(SiteToSiteTransportProtocol.RAW, TransferDirection.RECEIVE);
+
+        final String peerUrl = "nifi://node1.example.com:9090";
+        final PeerDescription peerDescription = new 
PeerDescription("node1.example.com", 9090, false);
+        try (final SocketChannel socketChannel = SocketChannel.open()) {
+            final CommunicationsSession commsSession = new 
SocketChannelCommunicationsSession(socketChannel);
+            final Peer peer = new Peer(peerDescription, commsSession, peerUrl, 
REMOTE_CLUSTER_URL);
+
+            doReturn(peer).when(transaction).getCommunicant();
+
+            final FlowFile flowFile = mock(FlowFile.class);
+            final String sourceFlowFileUuid = "flowfile-uuid";
+            final Map<String, String> attributes = new HashMap<>();
+            attributes.put(CoreAttributes.UUID.key(), sourceFlowFileUuid);
+            final byte[] dataPacketContents = "DataPacket Contents".getBytes();
+            final ByteArrayInputStream dataPacketInputStream = new 
ByteArrayInputStream(dataPacketContents);
+            final DataPacket dataPacket = new StandardDataPacket(attributes,
+                    dataPacketInputStream, dataPacketContents.length);
+
+            doReturn(flowFile).when(session).create();
+            // Return null when it gets called second time.
+            
doReturn(dataPacket).doReturn(null).when(this.transaction).receive();
+
+            doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), 
eq(attributes));
+            
doReturn(flowFile).when(session).importFrom(any(InputStream.class), 
eq(flowFile));
+
+            port.onTrigger(context, session);
+
+            // Transit uri can be customized if necessary.
+            verify(provenanceReporter).receive(eq(flowFile), eq(peerUrl + "/" 
+ sourceFlowFileUuid), any(String.class),
+                    any(String.class), any(Long.class));
+        }
+
+    }
+
+    @Test
+    public void testSendHttp() throws Exception {
+
+        setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.SEND);
+
+        final String peerUrl = "http://node1.example.com:8080/nifi";;
+        final PeerDescription peerDescription = new 
PeerDescription("node1.example.com", 8080, false);
+        final HttpCommunicationsSession commsSession = new 
HttpCommunicationsSession();
+        final Peer peer = new Peer(peerDescription, commsSession, peerUrl, 
REMOTE_CLUSTER_URL);
+
+        final String flowFileEndpointUri = 
"http://node1.example.com:8080/nifi-api/output-ports/port-id/transactions/transaction-id/flow-files";;
+
+        doReturn(peer).when(transaction).getCommunicant();
+        commsSession.setDataTransferUrl(flowFileEndpointUri);
+
+        final QueueSize queueSize = new QueueSize(1, 10);
+        final FlowFile flowFile = mock(FlowFile.class);
+
+        doReturn(queueSize).when(session).getQueueSize();
+        // Return null when it's called second time.
+        doReturn(flowFile).doReturn(null).when(session).get();
+
+        port.onTrigger(context, session);
+
+        // peerUrl should be used as the transit url.
+        verify(provenanceReporter).send(eq(flowFile), eq(flowFileEndpointUri), 
any(String.class),
+                any(Long.class), eq(false));
+
+    }
+
+    @Test
+    public void testReceiveHttp() throws Exception {
+
+        setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.RECEIVE);
+
+        final String peerUrl = "http://node1.example.com:8080/nifi";;
+        final PeerDescription peerDescription = new 
PeerDescription("node1.example.com", 8080, false);
+        final HttpCommunicationsSession commsSession = new 
HttpCommunicationsSession();
+        final Peer peer = new Peer(peerDescription, commsSession, peerUrl, 
REMOTE_CLUSTER_URL);
+
+        final String flowFileEndpointUri = 
"http://node1.example.com:8080/nifi-api/output-ports/port-id/transactions/transaction-id/flow-files";;
+
+        doReturn(peer).when(transaction).getCommunicant();
+        commsSession.setDataTransferUrl(flowFileEndpointUri);
+
+        final FlowFile flowFile = mock(FlowFile.class);
+        final Map<String, String> attributes = new HashMap<>();
+        final byte[] dataPacketContents = "DataPacket Contents".getBytes();
+        final ByteArrayInputStream dataPacketInputStream = new 
ByteArrayInputStream(dataPacketContents);
+        final DataPacket dataPacket = new StandardDataPacket(attributes,
+                dataPacketInputStream, dataPacketContents.length);
+
+        doReturn(flowFile).when(session).create();
+        // Return null when it's called second time.
+        doReturn(dataPacket).doReturn(null).when(transaction).receive();
+
+        doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), 
eq(attributes));
+        doReturn(flowFile).when(session).importFrom(any(InputStream.class), 
eq(flowFile));
+
+        port.onTrigger(context, session);
+
+        // peerUrl should be used as the transit url.
+        verify(provenanceReporter).receive(eq(flowFile), 
eq(flowFileEndpointUri), any(String.class),
+                any(String.class), any(Long.class));
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/809f0423/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
index 4519ddd..7c9d30b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
@@ -297,8 +297,11 @@ public class TestHttpFlowFileServerProtocol {
         final HttpRemoteSiteListener remoteSiteListener = 
HttpRemoteSiteListener.getInstance();
         final Peer peer = getDefaultPeer(transactionId);
         final HttpServerCommunicationsSession commsSession = 
(HttpServerCommunicationsSession) peer.getCommunicationsSession();
+        final String endpointUri = 
"https://peer-host:8443/nifi-api/output-ports/port-id/transactions/";
+                + transactionId + "/flow-files";
         commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "1");
         commsSession.setUserDn("unit-test");
+        commsSession.setDataTransferUrl(endpointUri);
 
         serverProtocol.handshake(peer);
 
@@ -312,9 +315,9 @@ public class TestHttpFlowFileServerProtocol {
         doReturn(flowFile).when(processSession).get();
         
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
         doAnswer(invocation -> {
-            final String peerUrl = (String)invocation.getArguments()[1];
+            final String transitUri = (String)invocation.getArguments()[1];
             final String detail = (String)invocation.getArguments()[2];
-            assertEquals("http://peer-host:8080/";, peerUrl);
+            assertEquals(endpointUri, transitUri);
             assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
             return null;
         }).when(provenanceReporter).send(eq(flowFile), any(String.class), 
any(String.class), any(Long.class), any(Boolean.class));
@@ -336,13 +339,16 @@ public class TestHttpFlowFileServerProtocol {
     @Test
     public void testTransferTwoFiles() throws Exception {
         final HttpRemoteSiteListener remoteSiteListener = 
HttpRemoteSiteListener.getInstance();
-        final HttpFlowFileServerProtocol serverProtocol = 
getDefaultHttpFlowFileServerProtocol();
 
         final String transactionId = "testTransferTwoFiles";
         final Peer peer = getDefaultPeer(transactionId);
+        final String endpointUri = 
"https://peer-host:8443/nifi-api/output-ports/port-id/transactions/";
+                + transactionId + "/flow-files";
+        final HttpFlowFileServerProtocol serverProtocol = 
getDefaultHttpFlowFileServerProtocol();
         final HttpServerCommunicationsSession commsSession = 
(HttpServerCommunicationsSession) peer.getCommunicationsSession();
         commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "2");
         commsSession.setUserDn("unit-test");
+        commsSession.setDataTransferUrl(endpointUri);
 
         serverProtocol.handshake(peer);
 
@@ -360,18 +366,18 @@ public class TestHttpFlowFileServerProtocol {
 
         
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
         doAnswer(invocation -> {
-            final String peerUrl = (String)invocation.getArguments()[1];
+            final String transitUri = (String)invocation.getArguments()[1];
             final String detail = (String)invocation.getArguments()[2];
-            assertEquals("http://peer-host:8080/";, peerUrl);
+            assertEquals(endpointUri, transitUri);
             assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
             return null;
         }).when(provenanceReporter).send(eq(flowFile1), any(String.class), 
any(String.class), any(Long.class), any(Boolean.class));
 
         
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
         doAnswer(invocation -> {
-            final String peerUrl = (String)invocation.getArguments()[1];
+            final String transitUri = (String)invocation.getArguments()[1];
             final String detail = (String)invocation.getArguments()[2];
-            assertEquals("http://peer-host:8080/";, peerUrl);
+            assertEquals(endpointUri, transitUri);
             assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
             return null;
         }).when(provenanceReporter).send(eq(flowFile2), any(String.class), 
any(String.class), any(Long.class), any(Boolean.class));
@@ -465,9 +471,12 @@ public class TestHttpFlowFileServerProtocol {
 
     private void receiveOneFile(final HttpFlowFileServerProtocol 
serverProtocol, final String transactionId, final Peer peer) throws IOException 
{
         final HttpRemoteSiteListener remoteSiteListener = 
HttpRemoteSiteListener.getInstance();
+        final String endpointUri = 
"https://peer-host:8443/nifi-api/input-ports/port-id/transactions/";
+                + transactionId + "/flow-files";
         final HttpServerCommunicationsSession commsSession = 
(HttpServerCommunicationsSession) peer.getCommunicationsSession();
         commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "1");
         commsSession.setUserDn("unit-test");
+        commsSession.setDataTransferUrl(endpointUri);
 
         serverProtocol.handshake(peer);
 
@@ -499,9 +508,9 @@ public class TestHttpFlowFileServerProtocol {
         
doReturn(flowFile).when(processSession).putAttribute(any(FlowFile.class), 
any(String.class), any(String.class));
         
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
         doAnswer(invocation -> {
-            final String peerUrl = (String)invocation.getArguments()[1];
+            final String transitUri = (String)invocation.getArguments()[1];
             final String detail = (String)invocation.getArguments()[3];
-            assertEquals("http://peer-host:8080/";, peerUrl);
+            assertEquals(endpointUri, transitUri);
             assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
             return null;
         }).when(provenanceReporter)
@@ -522,13 +531,16 @@ public class TestHttpFlowFileServerProtocol {
     @Test
     public void testReceiveTwoFiles() throws Exception {
         final HttpRemoteSiteListener remoteSiteListener = 
HttpRemoteSiteListener.getInstance();
-        final HttpFlowFileServerProtocol serverProtocol = 
getDefaultHttpFlowFileServerProtocol();
 
         final String transactionId = "testReceiveTwoFile";
+        final String endpointUri = 
"https://peer-host:8443/nifi-api/input-ports/port-id/transactions/";
+                + transactionId + "/flow-files";
+        final HttpFlowFileServerProtocol serverProtocol = 
getDefaultHttpFlowFileServerProtocol();
         final Peer peer = getDefaultPeer(transactionId);
         final HttpServerCommunicationsSession commsSession = 
(HttpServerCommunicationsSession) peer.getCommunicationsSession();
         commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "2");
         commsSession.setUserDn("unit-test");
+        commsSession.setDataTransferUrl(endpointUri);
 
         serverProtocol.handshake(peer);
 
@@ -562,9 +574,9 @@ public class TestHttpFlowFileServerProtocol {
                 .when(processSession).putAttribute(any(FlowFile.class), 
any(String.class), any(String.class));
         
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
         doAnswer(invocation -> {
-            final String peerUrl = (String)invocation.getArguments()[1];
+            final String transitUri = (String)invocation.getArguments()[1];
             final String detail = (String)invocation.getArguments()[3];
-            assertEquals("http://peer-host:8080/";, peerUrl);
+            assertEquals(endpointUri, transitUri);
             assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
             return null;
         }).when(provenanceReporter)

http://git-wip-us.apache.org/repos/asf/nifi/blob/809f0423/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
index bd61841..3034e1e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
@@ -45,6 +45,7 @@ import org.apache.nifi.remote.exception.BadRequestException;
 import org.apache.nifi.remote.exception.HandshakeException;
 import org.apache.nifi.remote.exception.NotAuthorizedException;
 import org.apache.nifi.remote.exception.RequestExpiredException;
+import org.apache.nifi.remote.io.http.HttpCommunicationsSession;
 import org.apache.nifi.remote.io.http.HttpOutput;
 import org.apache.nifi.remote.io.http.HttpServerCommunicationsSession;
 import org.apache.nifi.remote.protocol.HandshakeProperty;
@@ -212,7 +213,7 @@ public class DataTransferResource extends 
ApplicationResource {
 
         try {
             // Execute handshake.
-            initiateServerProtocol(peer, transportProtocolVersion);
+            initiateServerProtocol(req, peer, transportProtocolVersion);
 
             TransactionResultEntity entity = new TransactionResultEntity();
             entity.setResponseCode(ResponseCode.PROPERTIES_OK.getCode());
@@ -280,7 +281,7 @@ public class DataTransferResource extends 
ApplicationResource {
         final int transportProtocolVersion = 
validationResult.transportProtocolVersion;
 
         try {
-            HttpFlowFileServerProtocol serverProtocol = 
initiateServerProtocol(peer, transportProtocolVersion);
+            HttpFlowFileServerProtocol serverProtocol = 
initiateServerProtocol(req, peer, transportProtocolVersion);
             int numOfFlowFiles = 
serverProtocol.getPort().receiveFlowFiles(peer, serverProtocol);
             logger.debug("finished receiving flow files, numOfFlowFiles={}", 
numOfFlowFiles);
             if (numOfFlowFiles < 1) {
@@ -304,10 +305,15 @@ public class DataTransferResource extends 
ApplicationResource {
         return responseCreator.acceptedResponse(transactionManager, 
serverChecksum, transportProtocolVersion);
     }
 
-    private HttpFlowFileServerProtocol initiateServerProtocol(Peer peer, 
Integer transportProtocolVersion) throws IOException {
+    private HttpFlowFileServerProtocol initiateServerProtocol(final 
HttpServletRequest req, final Peer peer,
+                                                              final Integer 
transportProtocolVersion) throws IOException {
         // Switch transaction protocol version based on transport protocol 
version.
         TransportProtocolVersionNegotiator negotiatedTransportProtocolVersion 
= new TransportProtocolVersionNegotiator(transportProtocolVersion);
         VersionNegotiator versionNegotiator = new 
StandardVersionNegotiator(negotiatedTransportProtocolVersion.getTransactionProtocolVersion());
+
+        final String dataTransferUrl = req.getRequestURL().toString();
+        
((HttpCommunicationsSession)peer.getCommunicationsSession()).setDataTransferUrl(dataTransferUrl);
+
         HttpFlowFileServerProtocol serverProtocol = 
getHttpFlowFileServerProtocol(versionNegotiator);
         
HttpRemoteSiteListener.getInstance().setupServerProtocol(serverProtocol);
         // TODO: How should I pass cluster information?
@@ -316,11 +322,12 @@ public class DataTransferResource extends 
ApplicationResource {
         return serverProtocol;
     }
 
-    HttpFlowFileServerProtocol getHttpFlowFileServerProtocol(VersionNegotiator 
versionNegotiator) {
+    HttpFlowFileServerProtocol getHttpFlowFileServerProtocol(final 
VersionNegotiator versionNegotiator) {
         return new StandardHttpFlowFileServerProtocol(versionNegotiator);
     }
 
-    private Peer constructPeer(HttpServletRequest req, InputStream 
inputStream, OutputStream outputStream, String portId, String transactionId) {
+    private Peer constructPeer(final HttpServletRequest req, final InputStream 
inputStream,
+                               final OutputStream outputStream, final String 
portId, final String transactionId) {
         final String clientHostName = req.getRemoteHost();
         final int clientPort = req.getRemotePort();
 
@@ -357,7 +364,7 @@ public class DataTransferResource extends 
ApplicationResource {
             commSession.putHandshakeParam(BATCH_DURATION, batchDuration);
         }
 
-        if(peerDescription.isSecure()){
+        if (peerDescription.isSecure()) {
             final NiFiUser nifiUser = NiFiUserUtils.getNiFiUser();
             logger.debug("initiating peer, nifiUser={}", nifiUser);
             commSession.setUserDn(nifiUser.getIdentity());
@@ -366,6 +373,7 @@ public class DataTransferResource extends 
ApplicationResource {
         // TODO: Followed how SocketRemoteSiteListener define peerUrl and 
clusterUrl, but it can be more meaningful values, especially for clusterUrl.
         final String peerUrl = "nifi://" + clientHostName + ":" + clientPort;
         final String clusterUrl = "nifi://localhost:" + req.getLocalPort();
+
         return new Peer(peerDescription, commSession, peerUrl, clusterUrl);
     }
 
@@ -434,7 +442,7 @@ public class DataTransferResource extends 
ApplicationResource {
 
         final TransactionResultEntity entity = new TransactionResultEntity();
         try {
-            HttpFlowFileServerProtocol serverProtocol = 
initiateServerProtocol(peer, transportProtocolVersion);
+            HttpFlowFileServerProtocol serverProtocol = 
initiateServerProtocol(req, peer, transportProtocolVersion);
 
             String inputErrMessage = null;
             if (responseCode == null) {
@@ -540,7 +548,7 @@ public class DataTransferResource extends 
ApplicationResource {
 
         final TransactionResultEntity entity = new TransactionResultEntity();
         try {
-            HttpFlowFileServerProtocol serverProtocol = 
initiateServerProtocol(peer, transportProtocolVersion);
+            HttpFlowFileServerProtocol serverProtocol = 
initiateServerProtocol(req, peer, transportProtocolVersion);
             HttpServerCommunicationsSession commsSession = 
(HttpServerCommunicationsSession) peer.getCommunicationsSession();
             // Pass the response code sent from the client.
             String inputErrMessage = null;
@@ -653,7 +661,7 @@ public class DataTransferResource extends 
ApplicationResource {
         final Peer peer = constructPeer(req, inputStream, tempBos, portId, 
transactionId);
         final int transportProtocolVersion = 
validationResult.transportProtocolVersion;
         try {
-            final HttpFlowFileServerProtocol serverProtocol = 
initiateServerProtocol(peer, transportProtocolVersion);
+            final HttpFlowFileServerProtocol serverProtocol = 
initiateServerProtocol(req, peer, transportProtocolVersion);
 
             StreamingOutput flowFileContent = new StreamingOutput() {
                 @Override
@@ -792,7 +800,7 @@ public class DataTransferResource extends 
ApplicationResource {
 
         try {
             // Do handshake
-            initiateServerProtocol(peer, transportProtocolVersion);
+            initiateServerProtocol(req, peer, transportProtocolVersion);
             transactionManager.extendTransaction(transactionId);
 
             final TransactionResultEntity entity = new 
TransactionResultEntity();

http://git-wip-us.apache.org/repos/asf/nifi/blob/809f0423/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
index d16c626..0411bec 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
@@ -18,6 +18,8 @@ package org.apache.nifi.web.api;
 
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
@@ -214,10 +216,22 @@ public class SiteToSiteResource extends 
ApplicationResource {
         } else {
             // Standalone mode.
             final PeerDTO peer = new PeerDTO();
-            // req.getLocalName returns private IP address, that can't be 
accessed from client in some environments.
+
+            // Private IP address or hostname may not be accessible from 
client in some environments.
             // So, use the value defined in nifi.properties instead when it is 
defined.
             final String remoteInputHost = properties.getRemoteInputHost();
-            peer.setHostname(isEmpty(remoteInputHost) ? req.getLocalName() : 
remoteInputHost);
+            String localName;
+            try {
+                // Get local host name using InetAddress if available, same as 
RAW socket does.
+                localName = InetAddress.getLocalHost().getHostName();
+            } catch (UnknownHostException e) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Failed to get local host name using 
InetAddress.", e);
+                }
+                localName = req.getLocalName();
+            }
+
+            peer.setHostname(isEmpty(remoteInputHost) ? localName : 
remoteInputHost);
             peer.setPort(properties.getRemoteInputHttpPort());
             peer.setSecure(properties.isSiteToSiteSecure());
             peer.setFlowFileCount(0);  // doesn't matter how many FlowFiles we 
have, because we're the only host.

http://git-wip-us.apache.org/repos/asf/nifi/blob/809f0423/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestDataTransferResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestDataTransferResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestDataTransferResource.java
index c23cc7d..422dbc3 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestDataTransferResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestDataTransferResource.java
@@ -64,6 +64,9 @@ public class TestDataTransferResource {
     private HttpServletRequest createCommonHttpServletRequest() {
         final HttpServletRequest req = mock(HttpServletRequest.class);
         doReturn("1").when(req).getHeader(eq(HttpHeaders.PROTOCOL_VERSION));
+        doReturn(new StringBuffer("http://nifi.example.com:8080";)
+                
.append("/nifi-api/data-transfer/output-ports/port-id/transactions/tx-id/flow-files"))
+                .when(req).getRequestURL();
         return req;
     }
 

Reply via email to