Repository: nifi
Updated Branches:
  refs/heads/master 7c0ee014d -> 1913b1e2a


http://git-wip-us.apache.org/repos/asf/nifi/blob/1913b1e2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/PeerDescriptionModifiable.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/PeerDescriptionModifiable.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/PeerDescriptionModifiable.java
new file mode 100644
index 0000000..ef9cac0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/PeerDescriptionModifiable.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+/**
+ * This interface is used to determine whether a ServerProtocol implementation
+ * can utilize peer description modification for making S2S work behind a 
reverse proxy.
+ */
+public interface PeerDescriptionModifiable {
+    void setPeerDescriptionModifier(final PeerDescriptionModifier modifier);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1913b1e2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/PeerDescriptionModifier.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/PeerDescriptionModifier.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/PeerDescriptionModifier.java
new file mode 100644
index 0000000..9e40c49
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/PeerDescriptionModifier.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import org.apache.nifi.attribute.expression.language.PreparedQuery;
+import org.apache.nifi.attribute.expression.language.Query;
+import 
org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageParsingException;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+
+public class PeerDescriptionModifier {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(PeerDescriptionModifier.class);
+
+    public enum RequestType {
+        SiteToSiteDetail,
+        Peers
+    }
+
+    private static class Route {
+        private String name;
+        private SiteToSiteTransportProtocol protocol;
+        private PreparedQuery predicate;
+        private PreparedQuery hostname;
+        private PreparedQuery port;
+        private PreparedQuery secure;
+
+        private Route validate() {
+            if (hostname == null) {
+                throw new IllegalArgumentException(
+                        format("Found an invalid Site-to-Site route definition 
[%s] 'hostname' is not specified.", name));
+            }
+            if (port == null) {
+                throw new IllegalArgumentException(
+                        format("Found an invalid Site-to-Site route definition 
[%s] 'port' is not specified.", name));
+            }
+            return this;
+        }
+
+        private PeerDescription getTarget(final Map<String, String> variables) 
{
+            final String targetHostName = 
hostname.evaluateExpressions(variables, null);
+            if (isBlank(targetHostName)) {
+                throw new IllegalStateException("Target hostname was not 
resolved for the route definition " + name);
+            }
+
+            final String targetPortStr = port.evaluateExpressions(variables, 
null);
+            if (isBlank(targetPortStr)) {
+                throw new IllegalStateException("Target port was not resolved 
for the route definition " + name);
+            }
+
+            final String targetIsSecure = secure == null ? null : 
secure.evaluateExpressions(variables, null);
+            return new PeerDescription(targetHostName, 
Integer.valueOf(targetPortStr), Boolean.valueOf(targetIsSecure));
+        }
+    }
+
+    private Map<SiteToSiteTransportProtocol, List<Route>> routes;
+
+
+    private static final String PROPERTY_PREFIX = "nifi.remote.route.";
+    private static final Pattern PROPERTY_REGEX = 
Pattern.compile("^nifi\\.remote\\.route\\.(raw|http)\\.([^.]+)\\.(when|hostname|port|secure)$");
+
+    public PeerDescriptionModifier(final NiFiProperties properties) {
+        final Map<Tuple<String, String>, List<Tuple<String, String>>> 
routeDefinitions = properties.getPropertyKeys().stream()
+                .filter(propertyKey -> propertyKey.startsWith(PROPERTY_PREFIX))
+                .map(propertyKey -> {
+                            final Matcher matcher = 
PROPERTY_REGEX.matcher(propertyKey);
+                            if (!matcher.matches()) {
+                                throw new IllegalArgumentException(
+                                        format("Found an invalid Site-to-Site 
route definition property '%s'." +
+                                                        " Routing property 
keys should be formatted as 
'nifi.remote.route.{protocol}.{name}.{routingConfigName}'." +
+                                                        " Where {protocol} is 
'raw' or 'http', and {routingConfigName} is 'when', 'hostname', 'port' or 
'secure'.",
+                                                propertyKey));
+                            }
+                            return matcher;
+                })
+                .collect(Collectors.groupingBy(matcher -> new 
Tuple<>(matcher.group(1), matcher.group(2)),
+                        Collectors.mapping(matcher -> new 
Tuple<>(matcher.group(3), matcher.group(0)), Collectors.toList())));
+
+        routes = routeDefinitions.entrySet().stream().map(routeDefinition -> {
+            final Route route = new Route();
+            // E.g. [raw, example1], [http, example2]
+            final Tuple<String, String> protocolAndRoutingName = 
routeDefinition.getKey();
+            route.protocol = 
SiteToSiteTransportProtocol.valueOf(protocolAndRoutingName.getKey().toUpperCase());
+            route.name = protocolAndRoutingName.getValue();
+            routeDefinition.getValue().forEach(routingConfigNameAndPropertyKey 
-> {
+                final String routingConfigName = 
routingConfigNameAndPropertyKey.getKey();
+                final String propertyKey = 
routingConfigNameAndPropertyKey.getValue();
+                final String routingConfigValue = 
properties.getProperty(propertyKey);
+                try {
+                    switch (routingConfigName) {
+                        case "when":
+                            route.predicate = 
Query.prepare(routingConfigValue);
+                            break;
+                        case "hostname":
+                            route.hostname = Query.prepare(routingConfigValue);
+                            break;
+                        case "port":
+                            route.port = Query.prepare(routingConfigValue);
+                            break;
+                        case "secure":
+                            route.secure = Query.prepare(routingConfigValue);
+                            break;
+                    }
+                } catch (AttributeExpressionLanguageParsingException e) {
+                    throw new IllegalArgumentException(format("Failed to parse 
NiFi expression language configured" +
+                            " for Site-to-Site routing property at '%s' due to 
'%s'", propertyKey, e.getMessage()), e);
+                }
+            });
+            return route;
+        }).map(Route::validate).collect(Collectors.groupingBy(r -> 
r.protocol));
+
+    }
+
+    private void addVariables(Map<String, String> map, String prefix, 
PeerDescription peer) {
+        map.put(format("%s.hostname", prefix), peer.getHostname());
+        map.put(format("%s.port", prefix), String.valueOf(peer.getPort()));
+        map.put(format("%s.secure", prefix), String.valueOf(peer.isSecure()));
+    }
+
+    public boolean isModificationNeeded(final SiteToSiteTransportProtocol 
protocol) {
+        return routes != null && routes.containsKey(protocol) && 
!routes.get(protocol).isEmpty();
+    }
+
+    /**
+     * Modifies target peer description so that subsequent request can go 
through the appropriate route
+     * @param source The source peer from which a request was sent, this can 
be any server host participated to relay the request,
+     *              but should be the one which can contribute to derive the 
correct target peer.
+     * @param target The original target which should receive and process 
further incoming requests.
+     * @param protocol The S2S protocol being used.
+     * @param requestType The requested API type.
+     * @param variables Containing context variables those can be referred 
from Expression Language.
+     * @return A peer description. The original target peer can be returned if 
there is no intermediate peer such as reverse proxies needed.
+     */
+    public PeerDescription modify(final PeerDescription source, final 
PeerDescription target,
+                                  final SiteToSiteTransportProtocol protocol, 
final RequestType requestType,
+                                  final Map<String, String> variables) {
+
+        addVariables(variables, "s2s.source", source);
+        addVariables(variables, "s2s.target", target);
+        variables.put("s2s.protocol", protocol.name());
+        variables.put("s2s.request", requestType.name());
+
+        logger.debug("Modifying PeerDescription, variables={}", variables);
+
+        return routes.get(protocol).stream().filter(r -> r.predicate == null
+                || Boolean.valueOf(r.predicate.evaluateExpressions(variables, 
null)))
+                .map(r -> {
+                    final PeerDescription t = r.getTarget(variables);
+                    logger.debug("Route definition {} matched, {}", r.name, t);
+                    return t;
+                })
+                // If a matched route was found, use it, else use the original 
target.
+                .findFirst().orElse(target);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1913b1e2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
index 2fae669..07c5920 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
@@ -64,6 +64,7 @@ public class SocketRemoteSiteListener implements 
RemoteSiteListener {
     private final NodeInformant nodeInformant;
     private final AtomicReference<ProcessGroup> rootGroup = new 
AtomicReference<>();
     private final NiFiProperties nifiProperties;
+    private final PeerDescriptionModifier peerDescriptionModifier;
 
     private final AtomicBoolean stopped = new AtomicBoolean(false);
 
@@ -78,6 +79,7 @@ public class SocketRemoteSiteListener implements 
RemoteSiteListener {
         this.sslContext = sslContext;
         this.nifiProperties = nifiProperties;
         this.nodeInformant = nodeInformant;
+        peerDescriptionModifier = new PeerDescriptionModifier(nifiProperties);
     }
 
     @Override
@@ -218,6 +220,9 @@ public class SocketRemoteSiteListener implements 
RemoteSiteListener {
                                 protocol = 
RemoteResourceFactory.receiveServerProtocolNegotiation(dis, dos);
                                 protocol.setRootProcessGroup(rootGroup.get());
                                 protocol.setNodeInformant(nodeInformant);
+                                if (protocol instanceof 
PeerDescriptionModifiable) {
+                                    
((PeerDescriptionModifiable)protocol).setPeerDescriptionModifier(peerDescriptionModifier);
+                                }
 
                                 final PeerDescription description = new 
PeerDescription(clientHostName, clientPort, sslContext != null);
                                 peer = new Peer(description, commsSession, 
peerUri, "nifi://localhost:" + getPort());

http://git-wip-us.apache.org/repos/asf/nifi/blob/1913b1e2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
index a7c0212..cb0746e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
@@ -26,6 +26,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerDescription;
+import org.apache.nifi.remote.PeerDescriptionModifiable;
+import org.apache.nifi.remote.PeerDescriptionModifier;
 import org.apache.nifi.remote.RemoteResourceFactory;
 import org.apache.nifi.remote.StandardVersionNegotiator;
 import org.apache.nifi.remote.VersionNegotiator;
@@ -39,14 +42,22 @@ import 
org.apache.nifi.remote.protocol.CommunicationsSession;
 import org.apache.nifi.remote.protocol.HandshakeProperties;
 import org.apache.nifi.remote.protocol.RequestType;
 import org.apache.nifi.remote.protocol.ResponseCode;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 
-public class SocketFlowFileServerProtocol extends 
AbstractFlowFileServerProtocol {
+public class SocketFlowFileServerProtocol extends 
AbstractFlowFileServerProtocol implements PeerDescriptionModifiable {
 
     public static final String RESOURCE_NAME = "SocketFlowFileProtocol";
 
     // Version 6 added to support Zero-Master Clustering, which was introduced 
in NiFi 1.0.0
     private final VersionNegotiator versionNegotiator = new 
StandardVersionNegotiator(6, 5, 4, 3, 2, 1);
 
+    private PeerDescriptionModifier peerDescriptionModifier;
+
+    @Override
+    public void setPeerDescriptionModifier(PeerDescriptionModifier modifier) {
+        peerDescriptionModifier = modifier;
+    }
+
     @Override
     protected HandshakeProperties doHandshake(Peer peer) throws IOException, 
HandshakeException {
 
@@ -189,9 +200,21 @@ public class SocketFlowFileServerProtocol extends 
AbstractFlowFileServerProtocol
                 continue;
             }
 
-            dos.writeUTF(nodeInfo.getSiteToSiteHostname());
-            dos.writeInt(nodeInfo.getSiteToSitePort());
-            dos.writeBoolean(nodeInfo.isSiteToSiteSecure());
+            if (peerDescriptionModifier != null && 
peerDescriptionModifier.isModificationNeeded(SiteToSiteTransportProtocol.RAW)) {
+                final PeerDescription target = new 
PeerDescription(nodeInfo.getSiteToSiteHostname(), nodeInfo.getSiteToSitePort(), 
nodeInfo.isSiteToSiteSecure());
+                final PeerDescription modifiedTarget = 
peerDescriptionModifier.modify(peer.getDescription(), target,
+                        SiteToSiteTransportProtocol.RAW, 
PeerDescriptionModifier.RequestType.Peers, new HashMap<>());
+
+                dos.writeUTF(modifiedTarget.getHostname());
+                dos.writeInt(modifiedTarget.getPort());
+                dos.writeBoolean(modifiedTarget.isSecure());
+
+            } else {
+                dos.writeUTF(nodeInfo.getSiteToSiteHostname());
+                dos.writeInt(nodeInfo.getSiteToSitePort());
+                dos.writeBoolean(nodeInfo.isSiteToSiteSecure());
+            }
+
             dos.writeInt(nodeInfo.getTotalFlowFiles());
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/1913b1e2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestPeerDescriptionModifier.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestPeerDescriptionModifier.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestPeerDescriptionModifier.java
new file mode 100644
index 0000000..cf6639f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestPeerDescriptionModifier.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import 
org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException;
+import org.apache.nifi.properties.StandardNiFiProperties;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestPeerDescriptionModifier {
+
+    @Test
+    public void testNoConfiguration() {
+        Properties props = new Properties();
+        final NiFiProperties properties = new StandardNiFiProperties(props);
+        final PeerDescriptionModifier modifier = new 
PeerDescriptionModifier(properties);
+        
assertFalse(modifier.isModificationNeeded(SiteToSiteTransportProtocol.RAW));
+        
assertFalse(modifier.isModificationNeeded(SiteToSiteTransportProtocol.HTTP));
+    }
+
+    @Test
+    public void testInvalidNoHostname() {
+        Properties props = new Properties();
+        props.put("nifi.remote.route.raw.no-host.when", "true");
+        final NiFiProperties properties = new StandardNiFiProperties(props);
+        try {
+            new PeerDescriptionModifier(properties);
+            fail("Should throw an Exception");
+        } catch (IllegalArgumentException e) {
+            assertEquals("Found an invalid Site-to-Site route definition 
[no-host] 'hostname' is not specified.", e.getMessage());
+        }
+    }
+
+    @Test
+    public void testInvalidNoPort() {
+        Properties props = new Properties();
+        props.put("nifi.remote.route.raw.no-port.when", "true");
+        props.put("nifi.remote.route.raw.no-port.hostname", 
"proxy.example.com");
+        final NiFiProperties properties = new StandardNiFiProperties(props);
+        try {
+            new PeerDescriptionModifier(properties);
+            fail("Should throw an Exception");
+        } catch (IllegalArgumentException e) {
+            assertEquals("Found an invalid Site-to-Site route definition 
[no-port] 'port' is not specified.", e.getMessage());
+        }
+    }
+
+    @Test
+    public void testInvalidConfigurationName() {
+        Properties props = new Properties();
+        props.put("nifi.remote.route.raw.invalid-name.when", "true");
+        props.put("nifi.remote.route.raw.invalid-name.hostname", 
"proxy.example.com");
+        props.put("nifi.remote.route.raw.invalid-name.port", "8081");
+        props.put("nifi.remote.route.raw.invalid-name.secure", "true");
+        props.put("nifi.remote.route.raw.invalid-name.unsupported", "true");
+        final NiFiProperties properties = new StandardNiFiProperties(props);
+        try {
+            new PeerDescriptionModifier(properties);
+            fail("Should throw an Exception");
+        } catch (IllegalArgumentException e) {
+            assertEquals("Found an invalid Site-to-Site route definition 
property 'nifi.remote.route.raw.invalid-name.unsupported'." +
+                    " Routing property keys should be formatted as 
'nifi.remote.route.{protocol}.{name}.{routingConfigName}'." +
+                    " Where {protocol} is 'raw' or 'http', and 
{routingConfigName} is 'when', 'hostname', 'port' or 'secure'.", 
e.getMessage());
+        }
+    }
+
+    @Test
+    public void testInvalidPropertyKeyNoProtocol() {
+        Properties props = new Properties();
+        props.put("nifi.remote.route.", "true");
+        final NiFiProperties properties = new StandardNiFiProperties(props);
+        try {
+            new PeerDescriptionModifier(properties);
+            fail("Should throw an Exception");
+        } catch (IllegalArgumentException e) {
+            assertEquals("Found an invalid Site-to-Site route definition 
property 'nifi.remote.route.'." +
+                    " Routing property keys should be formatted as 
'nifi.remote.route.{protocol}.{name}.{routingConfigName}'." +
+                    " Where {protocol} is 'raw' or 'http', and 
{routingConfigName} is 'when', 'hostname', 'port' or 'secure'.", 
e.getMessage());
+        }
+    }
+
+    @Test
+    public void testInvalidPropertyKeyNoName() {
+        Properties props = new Properties();
+        props.put("nifi.remote.route.http.", "true");
+        final NiFiProperties properties = new StandardNiFiProperties(props);
+        try {
+            new PeerDescriptionModifier(properties);
+            fail("Should throw an Exception");
+        } catch (IllegalArgumentException e) {
+            assertEquals("Found an invalid Site-to-Site route definition 
property 'nifi.remote.route.http.'." +
+                    " Routing property keys should be formatted as 
'nifi.remote.route.{protocol}.{name}.{routingConfigName}'." +
+                    " Where {protocol} is 'raw' or 'http', and 
{routingConfigName} is 'when', 'hostname', 'port' or 'secure'.", 
e.getMessage());
+        }
+    }
+
+    @Test
+    public void testInvalidExpression() {
+        Properties props = new Properties();
+        props.put("nifi.remote.route.raw.invalid-el.when", 
"${nonExistingFunction()}");
+        props.put("nifi.remote.route.raw.invalid-el.hostname", 
"proxy.example.com");
+        props.put("nifi.remote.route.raw.invalid-el.port", "8081");
+        final NiFiProperties properties = new StandardNiFiProperties(props);
+        final PeerDescriptionModifier modifier = new 
PeerDescriptionModifier(properties);
+
+        final PeerDescription source = new PeerDescription("client", 12345, 
true);
+        final PeerDescription target = new PeerDescription("nifi0", 8081, 
true);
+
+        try {
+            modifier.modify(source, target,
+                    SiteToSiteTransportProtocol.RAW, 
PeerDescriptionModifier.RequestType.Peers, new HashMap<>());
+            fail("Should throw an Exception");
+        } catch (AttributeExpressionLanguageException e) {
+            assertTrue(e.getMessage().startsWith("Invalid Expression"));
+        }
+    }
+
+    @Test
+    public void testDefaultIsNotSecure() {
+        Properties props = new Properties();
+        props.put("nifi.remote.route.raw.no-port.when", "true");
+        props.put("nifi.remote.route.raw.no-port.hostname", 
"proxy.example.com");
+        props.put("nifi.remote.route.raw.no-port.port", "8443");
+        final NiFiProperties properties = new StandardNiFiProperties(props);
+        final PeerDescriptionModifier modifier = new 
PeerDescriptionModifier(properties);
+
+        final PeerDescription source = new PeerDescription("client", 12345, 
true);
+        final PeerDescription target = new PeerDescription("nifi0", 8081, 
true);
+        final PeerDescription modifiedTarget = modifier.modify(source, target,
+                SiteToSiteTransportProtocol.RAW, 
PeerDescriptionModifier.RequestType.Peers, new HashMap<>());
+        assertFalse(modifiedTarget.isSecure());
+    }
+
+    @Test
+    public void testRawPortToNode() {
+        Properties props = new Properties();
+
+        // RAW S2S route configs.
+        // Port number to Node
+        // proxy1.example.com:17491 -> nifi0:8081
+        // proxy1.example.com:17492 -> nifi1:8081
+        props.put("nifi.remote.route.raw.port-to-node.when", 
"${X-ProxyHost:equals('proxy1.example.com')" +
+                ":or(${s2s.source.hostname:equals('proxy1.example.com')})}");
+        props.put("nifi.remote.route.raw.port-to-node.hostname", 
"proxy1.example.com");
+        props.put("nifi.remote.route.raw.port-to-node.port",
+                "${s2s.target.hostname:equals('nifi0'):ifElse('17491'," +
+                        "${s2s.target.hostname:equals('nifi1'):ifElse('17492', 
'undefined')})}");
+        props.put("nifi.remote.route.raw.port-to-node.secure", "true");
+
+        // Other S2S configs.
+        props.put("nifi.remote.input.host", "node0");
+        props.put("nifi.remote.input.secure", "true");
+        props.put("nifi.remote.input.socket.port", "8081");
+        props.put("nifi.remote.input.http.enabled", "true");
+
+        final NiFiProperties properties = new StandardNiFiProperties(props);
+        final PeerDescriptionModifier modifier = new 
PeerDescriptionModifier(properties);
+
+        // For requests coming from the proxy server, modify target 
description,
+        // so that client can send further request to the proxy.
+        // To nifi0.
+        PeerDescription source = new PeerDescription("proxy1.example.com", 
12345, true);
+        PeerDescription target = new PeerDescription("nifi0", 8081, true);
+        PeerDescription modifiedTarget = modifier.modify(source, target, 
SiteToSiteTransportProtocol.RAW, 
PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>());
+
+        assertNotNull(modifiedTarget);
+        assertEquals("proxy1.example.com", modifiedTarget.getHostname());
+        assertEquals(17491, modifiedTarget.getPort());
+        assertEquals(true, modifiedTarget.isSecure());
+
+        // To nifi1.
+        target = new PeerDescription("nifi1", 8081, true);
+        modifiedTarget = modifier.modify(source, target, 
SiteToSiteTransportProtocol.RAW, 
PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>());
+
+        assertNotNull(modifiedTarget);
+        assertEquals("proxy1.example.com", modifiedTarget.getHostname());
+        assertEquals(17492, modifiedTarget.getPort());
+        assertEquals(true, modifiedTarget.isSecure());
+
+        // For requests coming directly, use the original target description.
+        source = new PeerDescription("192.168.1.101", 23456, true);
+        modifiedTarget = modifier.modify(source, target, 
SiteToSiteTransportProtocol.RAW, 
PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>());
+        assertNotNull(modifiedTarget);
+        assertEquals(target, modifiedTarget);
+
+    }
+
+    @Test
+    public void testRawServerNameToNode() {
+        Properties props = new Properties();
+
+        // RAW S2S route configs.
+        // Server name to Node
+        // nifi0.example.com:17491 -> nifi0:8081
+        // nifi1.example.com:17491 -> nifi1:8081
+        props.put("nifi.remote.route.raw.name-to-node.when", 
"${X-ProxyHost:contains('.example.com')" +
+                ":or(${s2s.source.hostname:contains('.example.com')})}");
+        props.put("nifi.remote.route.raw.name-to-node.hostname", 
"${s2s.target.hostname}.example.com");
+        props.put("nifi.remote.route.raw.name-to-node.port", "17491");
+        props.put("nifi.remote.route.raw.name-to-node.secure", "true");
+
+        // Other S2S configs.
+        props.put("nifi.remote.input.host", "node0");
+        props.put("nifi.remote.input.secure", "true");
+        props.put("nifi.remote.input.socket.port", "8081");
+        props.put("nifi.remote.input.http.enabled", "true");
+
+        final NiFiProperties properties = new StandardNiFiProperties(props);
+        final PeerDescriptionModifier modifier = new 
PeerDescriptionModifier(properties);
+
+        // For requests coming from the proxy server, modify target 
description,
+        // so that client can send further request to the proxy.
+        // To nifi0.
+        PeerDescription source = new PeerDescription("nifi0.example.com", 
12345, true);
+        PeerDescription target = new PeerDescription("nifi0", 8081, true);
+        PeerDescription modifiedTarget = modifier.modify(source, target, 
SiteToSiteTransportProtocol.RAW, 
PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>());
+
+        assertNotNull(modifiedTarget);
+        assertEquals("nifi0.example.com", modifiedTarget.getHostname());
+        assertEquals(17491, modifiedTarget.getPort());
+        assertEquals(true, modifiedTarget.isSecure());
+
+        // To nifi1.
+        target = new PeerDescription("nifi1", 8081, true);
+        modifiedTarget = modifier.modify(source, target, 
SiteToSiteTransportProtocol.RAW, 
PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>());
+
+        assertNotNull(modifiedTarget);
+        assertEquals("nifi1.example.com", modifiedTarget.getHostname());
+        assertEquals(17491, modifiedTarget.getPort());
+        assertEquals(true, modifiedTarget.isSecure());
+
+        // For requests coming directly, use the original target description.
+        source = new PeerDescription("192.168.1.101", 23456, true);
+        modifiedTarget = modifier.modify(source, target, 
SiteToSiteTransportProtocol.RAW, 
PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>());
+        assertNotNull(modifiedTarget);
+        assertEquals(target, modifiedTarget);
+
+    }
+
+    @Test
+    public void testHttpsTerminate() {
+        Properties props = new Properties();
+
+        // https://nifi0.example.com -> http://nifi0:8080
+        // https://nifi1.example.com -> http://nifi1:8080
+        // S2S HTTP configs.
+        props.put("nifi.remote.route.http.terminate.when", 
"${X-ProxyHost:contains('.example.com')" +
+                ":or(${s2s.source.hostname:contains('.example.com')})}");
+        props.put("nifi.remote.route.http.terminate.hostname", 
"${s2s.target.hostname}.example.com");
+        props.put("nifi.remote.route.http.terminate.port", "443");
+        props.put("nifi.remote.route.http.terminate.secure", "true");
+
+        // Other S2S configs.
+        props.put("nifi.web.http.host", "nifi0");
+        props.put("nifi.web.http.port", "8080");
+        props.put("nifi.remote.input.host", "nifi0");
+        props.put("nifi.remote.input.secure", "false");
+        props.put("nifi.remote.input.socket.port", "");
+        props.put("nifi.remote.input.http.enabled", "true");
+
+
+        final NiFiProperties properties = new StandardNiFiProperties(props);
+        final PeerDescriptionModifier modifier = new 
PeerDescriptionModifier(properties);
+
+        // For requests coming from the proxy server, modify target 
description,
+        // so that client can send further request to the proxy.
+        // To nifi0.
+        PeerDescription source = new PeerDescription("nifi0.example.com", 
12345, true);
+        PeerDescription target = new PeerDescription("nifi0", 8080, false);
+        final Map<String, String> proxyHeders = new HashMap<>();
+        proxyHeders.put("X-ProxyHost", "nifi0.example.com:443");
+        proxyHeders.put("X-Forwarded-For", "172.16.1.103");
+        PeerDescription modifiedTarget = modifier.modify(source, target, 
SiteToSiteTransportProtocol.HTTP, 
PeerDescriptionModifier.RequestType.SiteToSiteDetail, new 
HashMap<>(proxyHeders));
+
+        assertNotNull(modifiedTarget);
+        assertEquals("nifi0.example.com", modifiedTarget.getHostname());
+        assertEquals(443, modifiedTarget.getPort());
+        assertEquals(true, modifiedTarget.isSecure());
+
+        // To nifi1.
+        proxyHeders.put("X-ProxyHost", "nifi1.example.com:443");
+        target = new PeerDescription("nifi1", 8081, true);
+        modifiedTarget = modifier.modify(source, target, 
SiteToSiteTransportProtocol.HTTP, 
PeerDescriptionModifier.RequestType.SiteToSiteDetail, new 
HashMap<>(proxyHeders));
+
+        assertNotNull(modifiedTarget);
+        assertEquals("nifi1.example.com", modifiedTarget.getHostname());
+        assertEquals(443, modifiedTarget.getPort());
+        assertEquals(true, modifiedTarget.isSecure());
+
+        // For requests coming directly, use the original target description.
+        source = new PeerDescription("192.168.1.101", 23456, true);
+        modifiedTarget = modifier.modify(source, target, 
SiteToSiteTransportProtocol.HTTP, 
PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>());
+        assertNotNull(modifiedTarget);
+        assertEquals(target, modifiedTarget);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1913b1e2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index db0a568..85423c0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -137,6 +137,11 @@ public abstract class ApplicationResource {
      * @return resource uri
      */
     protected String generateResourceUri(final String... path) {
+        URI uri = buildResourceUri(path);
+        return uri.toString();
+    }
+
+    private URI buildResourceUri(final String... path) {
         final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
         uriBuilder.segment(path);
         URI uri = uriBuilder.build();
@@ -179,7 +184,7 @@ public abstract class ApplicationResource {
         } catch (final URISyntaxException use) {
             throw new UriBuilderException(use);
         }
-        return uri.toString();
+        return uri;
     }
 
     /**
@@ -1226,9 +1231,9 @@ public abstract class ApplicationResource {
         public Response locationResponse(UriInfo uriInfo, String portType, 
String portId, String transactionId, Object entity,
                                          Integer protocolVersion, final 
HttpRemoteSiteListener transactionManager) {
 
-            String path = "/data-transfer/" + portType + "/" + portId + 
"/transactions/" + transactionId;
-            URI location = uriInfo.getBaseUriBuilder().path(path).build();
-            return noCache(setCommonHeaders(Response.created(location), 
protocolVersion, transactionManager)
+            final URI transactionUri = buildResourceUri("data-transfer", 
portType, portId, "transactions", transactionId);
+
+            return noCache(setCommonHeaders(Response.created(transactionUri), 
protocolVersion, transactionManager)
                     .header(LOCATION_URI_INTENT_NAME, 
LOCATION_URI_INTENT_VALUE))
                     .entity(entity).build();
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1913b1e2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
index bf55c05..1afedad 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
@@ -48,7 +48,6 @@ import org.apache.nifi.remote.protocol.HandshakeProperty;
 import org.apache.nifi.remote.protocol.ResponseCode;
 import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocol;
 import org.apache.nifi.remote.protocol.http.StandardHttpFlowFileServerProtocol;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.api.entity.TransactionResultEntity;
@@ -74,6 +73,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.StreamingOutput;
 import javax.ws.rs.core.UriInfo;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;

http://git-wip-us.apache.org/repos/asf/nifi/blob/1913b1e2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
index 36cfda1..6737936 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
@@ -30,9 +30,12 @@ import 
org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.node.NodeWorkload;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.remote.HttpRemoteSiteListener;
+import org.apache.nifi.remote.PeerDescription;
+import org.apache.nifi.remote.PeerDescriptionModifier;
 import org.apache.nifi.remote.VersionNegotiator;
 import org.apache.nifi.remote.client.http.TransportProtocolVersionNegotiator;
 import org.apache.nifi.remote.exception.BadRequestException;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.remote.protocol.http.HttpHeaders;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.NiFiServiceFacade;
@@ -56,6 +59,8 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -80,9 +85,11 @@ public class SiteToSiteResource extends ApplicationResource {
     private final ResponseCreator responseCreator = new ResponseCreator();
     private final VersionNegotiator transportProtocolVersionNegotiator = new 
TransportProtocolVersionNegotiator(1);
     private final HttpRemoteSiteListener transactionManager;
+    private final PeerDescriptionModifier peerDescriptionModifier;
 
     public SiteToSiteResource(final NiFiProperties nifiProperties) {
         transactionManager = 
HttpRemoteSiteListener.getInstance(nifiProperties);
+        peerDescriptionModifier = new PeerDescriptionModifier(nifiProperties);
     }
 
     /**
@@ -131,6 +138,34 @@ public class SiteToSiteResource extends 
ApplicationResource {
         // get the controller dto
         final ControllerDTO controller = serviceFacade.getSiteToSiteDetails();
 
+        // Alter s2s port.
+        final boolean modificationNeededRaw = 
peerDescriptionModifier.isModificationNeeded(SiteToSiteTransportProtocol.RAW);
+        final boolean modificationNeededHttp = 
peerDescriptionModifier.isModificationNeeded(SiteToSiteTransportProtocol.HTTP);
+        if (modificationNeededRaw || modificationNeededHttp) {
+            final PeerDescription source = getSourcePeerDescription(req);
+            final Boolean isSiteToSiteSecure = controller.isSiteToSiteSecure();
+            final String siteToSiteHostname = getSiteToSiteHostname(req);
+            final Map<String, String> httpHeaders = getHttpHeaders(req);
+
+            if (modificationNeededRaw) {
+                final PeerDescription rawTarget = new 
PeerDescription(siteToSiteHostname, controller.getRemoteSiteListeningPort(), 
isSiteToSiteSecure);
+                final PeerDescription modifiedRawTarget = 
peerDescriptionModifier.modify(source, rawTarget,
+                        SiteToSiteTransportProtocol.RAW, 
PeerDescriptionModifier.RequestType.SiteToSiteDetail, new 
HashMap<>(httpHeaders));
+                
controller.setRemoteSiteListeningPort(modifiedRawTarget.getPort());
+            }
+
+            if (modificationNeededHttp) {
+                final PeerDescription httpTarget = new 
PeerDescription(siteToSiteHostname, 
controller.getRemoteSiteHttpListeningPort(), isSiteToSiteSecure);
+                final PeerDescription modifiedHttpTarget = 
peerDescriptionModifier.modify(source, httpTarget,
+                        SiteToSiteTransportProtocol.HTTP, 
PeerDescriptionModifier.RequestType.SiteToSiteDetail, new 
HashMap<>(httpHeaders));
+                
controller.setRemoteSiteHttpListeningPort(modifiedHttpTarget.getPort());
+                if (!controller.isSiteToSiteSecure() && 
modifiedHttpTarget.isSecure()) {
+                    // In order to enable TLS terminate at the reverse proxy 
server, even if NiFi itself is not secured, introduce the endpoint as secure.
+                    controller.setSiteToSiteSecure(true);
+                }
+            }
+        }
+
         // build the response entity
         final ControllerEntity entity = new ControllerEntity();
         entity.setController(controller);
@@ -147,6 +182,20 @@ public class SiteToSiteResource extends 
ApplicationResource {
         return noCache(Response.ok(entity)).build();
     }
 
+    private PeerDescription getSourcePeerDescription(@Context 
HttpServletRequest req) {
+        return new PeerDescription(req.getRemoteHost(), req.getRemotePort(), 
req.isSecure());
+    }
+
+    private Map<String, String> getHttpHeaders(@Context HttpServletRequest 
req) {
+        final Map<String, String> headers = new HashMap<>();
+        final Enumeration<String> headerNames = req.getHeaderNames();
+        while (headerNames.hasMoreElements()) {
+            final String name = headerNames.nextElement();
+            headers.put(name, req.getHeader(name));
+        }
+        return headers;
+    }
+
     /**
      * Returns the available Peers and its status of this NiFi.
      *
@@ -187,18 +236,29 @@ public class SiteToSiteResource extends 
ApplicationResource {
         }
 
         final List<PeerDTO> peers = new ArrayList<>();
+        final PeerDescription source = getSourcePeerDescription(req);
+        final boolean modificationNeeded = 
peerDescriptionModifier.isModificationNeeded(SiteToSiteTransportProtocol.HTTP);
+        final Map<String, String> headers = modificationNeeded ? 
getHttpHeaders(req) : null;
         if (properties.isNode()) {
 
             try {
                 final Map<NodeIdentifier, NodeWorkload> clusterWorkload = 
clusterCoordinator.getClusterWorkload();
-                clusterWorkload.entrySet().stream().forEach(entry -> {
+                clusterWorkload.forEach((nodeId, workload) -> {
+                    final String siteToSiteHostname = 
nodeId.getSiteToSiteAddress() == null ? nodeId.getApiAddress() : 
nodeId.getSiteToSiteAddress();
+                    final int siteToSitePort = 
nodeId.getSiteToSiteHttpApiPort() == null ? nodeId.getApiPort() : 
nodeId.getSiteToSiteHttpApiPort();
+
+                    PeerDescription target = new 
PeerDescription(siteToSiteHostname, siteToSitePort, 
nodeId.isSiteToSiteSecure());
+
+                    if (modificationNeeded) {
+                        target = peerDescriptionModifier.modify(source, target,
+                                SiteToSiteTransportProtocol.HTTP, 
PeerDescriptionModifier.RequestType.Peers, new HashMap<>(headers));
+                    }
+
                     final PeerDTO peer = new PeerDTO();
-                    final NodeIdentifier nodeId = entry.getKey();
-                    final String siteToSiteAddress = 
nodeId.getSiteToSiteAddress();
-                    peer.setHostname(siteToSiteAddress == null ? 
nodeId.getApiAddress() : siteToSiteAddress);
-                    peer.setPort(nodeId.getSiteToSiteHttpApiPort() == null ? 
nodeId.getApiPort() : nodeId.getSiteToSiteHttpApiPort());
-                    peer.setSecure(nodeId.isSiteToSiteSecure());
-                    peer.setFlowFileCount(entry.getValue().getFlowFileCount());
+                    peer.setHostname(target.getHostname());
+                    peer.setPort(target.getPort());
+                    peer.setSecure(target.isSecure());
+                    peer.setFlowFileCount(workload.getFlowFileCount());
                     peers.add(peer);
                 });
             } catch (IOException e) {
@@ -208,24 +268,20 @@ public class SiteToSiteResource extends 
ApplicationResource {
         } else {
             // Standalone mode.
             final PeerDTO peer = new PeerDTO();
+            final String siteToSiteHostname = getSiteToSiteHostname(req);
 
-            // Private IP address or hostname may not be accessible from 
client in some environments.
-            // So, use the value defined in nifi.properties instead when it is 
defined.
-            final String remoteInputHost = properties.getRemoteInputHost();
-            String localName;
-            try {
-                // Get local host name using InetAddress if available, same as 
RAW socket does.
-                localName = InetAddress.getLocalHost().getHostName();
-            } catch (UnknownHostException e) {
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Failed to get local host name using 
InetAddress.", e);
-                }
-                localName = req.getLocalName();
+
+            PeerDescription target = new PeerDescription(siteToSiteHostname,
+                    properties.getRemoteInputHttpPort(), 
properties.isSiteToSiteSecure());
+
+            if (modificationNeeded) {
+                target = peerDescriptionModifier.modify(source, target,
+                        SiteToSiteTransportProtocol.HTTP, 
PeerDescriptionModifier.RequestType.Peers, new HashMap<>(headers));
             }
 
-            peer.setHostname(isEmpty(remoteInputHost) ? localName : 
remoteInputHost);
-            peer.setPort(properties.getRemoteInputHttpPort());
-            peer.setSecure(properties.isSiteToSiteSecure());
+            peer.setHostname(target.getHostname());
+            peer.setPort(target.getPort());
+            peer.setSecure(target.isSecure());
             peer.setFlowFileCount(0);  // doesn't matter how many FlowFiles we 
have, because we're the only host.
 
             peers.add(peer);
@@ -237,6 +293,24 @@ public class SiteToSiteResource extends 
ApplicationResource {
         return noCache(setCommonHeaders(Response.ok(entity), 
transportProtocolVersion, transactionManager)).build();
     }
 
+    private String getSiteToSiteHostname(final HttpServletRequest req) {
+        // Private IP address or hostname may not be accessible from client in 
some environments.
+        // So, use the value defined in nifi.properties instead when it is 
defined.
+        final String remoteInputHost = properties.getRemoteInputHost();
+        String localName;
+        try {
+            // Get local host name using InetAddress if available, same as RAW 
socket does.
+            localName = InetAddress.getLocalHost().getHostName();
+        } catch (UnknownHostException e) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Failed to get local host name using 
InetAddress.", e);
+            }
+            localName = req.getLocalName();
+        }
+
+        return isEmpty(remoteInputHost) ? localName : remoteInputHost;
+    }
+
     // setters
 
     public void setServiceFacade(final NiFiServiceFacade serviceFacade) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/1913b1e2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestDataTransferResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestDataTransferResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestDataTransferResource.java
index 3dac9ce..d16a8a6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestDataTransferResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestDataTransferResource.java
@@ -41,10 +41,14 @@ import javax.ws.rs.core.StreamingOutput;
 import javax.ws.rs.core.UriBuilder;
 import javax.ws.rs.core.UriInfo;
 import java.io.InputStream;
+import java.lang.reflect.Field;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
 
+import static 
org.apache.nifi.web.api.ApplicationResource.PROXY_HOST_HTTP_HEADER;
+import static 
org.apache.nifi.web.api.ApplicationResource.PROXY_PORT_HTTP_HEADER;
+import static 
org.apache.nifi.web.api.ApplicationResource.PROXY_SCHEME_HTTP_HEADER;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
@@ -53,6 +57,7 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class TestDataTransferResource {
 
@@ -156,6 +161,17 @@ public class TestDataTransferResource {
 
         final ServletContext context = null;
         final UriInfo uriInfo = mockUriInfo(locationUriStr);
+        final Field uriInfoField = 
resource.getClass().getSuperclass().getSuperclass()
+                .getDeclaredField("uriInfo");
+        uriInfoField.setAccessible(true);
+        uriInfoField.set(resource, uriInfo);
+
+        final HttpServletRequest request = mock(HttpServletRequest.class);
+        final Field httpServletRequestField = 
resource.getClass().getSuperclass().getSuperclass()
+                .getDeclaredField("httpServletRequest");
+        httpServletRequestField.setAccessible(true);
+        httpServletRequestField.set(resource, request);
+
         final InputStream inputStream = null;
 
        final Response response = resource.createPortTransaction("input-ports", 
"port-id", req, context, uriInfo, inputStream);
@@ -168,6 +184,41 @@ public class TestDataTransferResource {
     }
 
     @Test
+    public void testCreateTransactionThroughReverseProxy() throws Exception {
+        final HttpServletRequest req = createCommonHttpServletRequest();
+
+        final DataTransferResource resource = getDataTransferResource();
+
+        final String locationUriStr = 
"https://nifi2.example.com:443/nifi-api/data-transfer/input-ports/port-id/transactions/transaction-id";;
+
+        final ServletContext context = null;
+        final UriInfo uriInfo = mockUriInfo(locationUriStr);
+        final Field uriInfoField = 
resource.getClass().getSuperclass().getSuperclass()
+                .getDeclaredField("uriInfo");
+        uriInfoField.setAccessible(true);
+        uriInfoField.set(resource, uriInfo);
+
+        final HttpServletRequest request = mock(HttpServletRequest.class);
+        when(request.getHeader(PROXY_SCHEME_HTTP_HEADER)).thenReturn("https");
+        
when(request.getHeader(PROXY_HOST_HTTP_HEADER)).thenReturn("nifi2.example.com");
+        when(request.getHeader(PROXY_PORT_HTTP_HEADER)).thenReturn("443");
+        final Field httpServletRequestField = 
resource.getClass().getSuperclass().getSuperclass()
+                .getDeclaredField("httpServletRequest");
+        httpServletRequestField.setAccessible(true);
+        httpServletRequestField.set(resource, request);
+
+        final InputStream inputStream = null;
+
+        final Response response = 
resource.createPortTransaction("input-ports", "port-id", req, context, uriInfo, 
inputStream);
+
+        TransactionResultEntity resultEntity = (TransactionResultEntity) 
response.getEntity();
+
+        assertEquals(201, response.getStatus());
+        assertEquals(ResponseCode.PROPERTIES_OK.getCode(), 
resultEntity.getResponseCode());
+        assertEquals(locationUriStr, 
response.getMetadata().getFirst(HttpHeaders.LOCATION_HEADER_NAME).toString());
+    }
+
+    @Test
     public void testExtendTransaction() throws Exception {
         final HttpServletRequest req = createCommonHttpServletRequest();
 

Reply via email to