Repository: nifi Updated Branches: refs/heads/master 7c0ee014d -> 1913b1e2a
http://git-wip-us.apache.org/repos/asf/nifi/blob/1913b1e2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/PeerDescriptionModifiable.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/PeerDescriptionModifiable.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/PeerDescriptionModifiable.java new file mode 100644 index 0000000..ef9cac0 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/PeerDescriptionModifiable.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +/** + * This interface is used to determine whether a ServerProtocol implementation + * can utilize peer description modification for making S2S work behind a reverse proxy. + */ +public interface PeerDescriptionModifiable { + void setPeerDescriptionModifier(final PeerDescriptionModifier modifier); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1913b1e2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/PeerDescriptionModifier.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/PeerDescriptionModifier.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/PeerDescriptionModifier.java new file mode 100644 index 0000000..9e40c49 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/PeerDescriptionModifier.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import org.apache.nifi.attribute.expression.language.PreparedQuery; +import org.apache.nifi.attribute.expression.language.Query; +import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageParsingException; +import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static java.lang.String.format; +import static org.apache.commons.lang3.StringUtils.isBlank; + +public class PeerDescriptionModifier { + + private static final Logger logger = LoggerFactory.getLogger(PeerDescriptionModifier.class); + + public enum RequestType { + SiteToSiteDetail, + Peers + } + + private static class Route { + private String name; + private SiteToSiteTransportProtocol protocol; + private PreparedQuery predicate; + private PreparedQuery hostname; + private PreparedQuery port; + private PreparedQuery secure; + + private Route validate() { + if (hostname == null) { + throw new IllegalArgumentException( + format("Found an invalid Site-to-Site route definition [%s] 'hostname' is not specified.", name)); + } + if (port == null) { + throw new IllegalArgumentException( + format("Found an invalid Site-to-Site route definition [%s] 'port' is not specified.", name)); + } + return this; + } + + private PeerDescription getTarget(final Map<String, String> variables) { + final String targetHostName = hostname.evaluateExpressions(variables, null); + if (isBlank(targetHostName)) { + throw new IllegalStateException("Target hostname was not resolved for the route definition " + name); + } + + final String targetPortStr = port.evaluateExpressions(variables, null); + if (isBlank(targetPortStr)) { + throw new IllegalStateException("Target port was not resolved for the route definition " + name); + } + + final String targetIsSecure = secure == null ? null : secure.evaluateExpressions(variables, null); + return new PeerDescription(targetHostName, Integer.valueOf(targetPortStr), Boolean.valueOf(targetIsSecure)); + } + } + + private Map<SiteToSiteTransportProtocol, List<Route>> routes; + + + private static final String PROPERTY_PREFIX = "nifi.remote.route."; + private static final Pattern PROPERTY_REGEX = Pattern.compile("^nifi\\.remote\\.route\\.(raw|http)\\.([^.]+)\\.(when|hostname|port|secure)$"); + + public PeerDescriptionModifier(final NiFiProperties properties) { + final Map<Tuple<String, String>, List<Tuple<String, String>>> routeDefinitions = properties.getPropertyKeys().stream() + .filter(propertyKey -> propertyKey.startsWith(PROPERTY_PREFIX)) + .map(propertyKey -> { + final Matcher matcher = PROPERTY_REGEX.matcher(propertyKey); + if (!matcher.matches()) { + throw new IllegalArgumentException( + format("Found an invalid Site-to-Site route definition property '%s'." + + " Routing property keys should be formatted as 'nifi.remote.route.{protocol}.{name}.{routingConfigName}'." + + " Where {protocol} is 'raw' or 'http', and {routingConfigName} is 'when', 'hostname', 'port' or 'secure'.", + propertyKey)); + } + return matcher; + }) + .collect(Collectors.groupingBy(matcher -> new Tuple<>(matcher.group(1), matcher.group(2)), + Collectors.mapping(matcher -> new Tuple<>(matcher.group(3), matcher.group(0)), Collectors.toList()))); + + routes = routeDefinitions.entrySet().stream().map(routeDefinition -> { + final Route route = new Route(); + // E.g. [raw, example1], [http, example2] + final Tuple<String, String> protocolAndRoutingName = routeDefinition.getKey(); + route.protocol = SiteToSiteTransportProtocol.valueOf(protocolAndRoutingName.getKey().toUpperCase()); + route.name = protocolAndRoutingName.getValue(); + routeDefinition.getValue().forEach(routingConfigNameAndPropertyKey -> { + final String routingConfigName = routingConfigNameAndPropertyKey.getKey(); + final String propertyKey = routingConfigNameAndPropertyKey.getValue(); + final String routingConfigValue = properties.getProperty(propertyKey); + try { + switch (routingConfigName) { + case "when": + route.predicate = Query.prepare(routingConfigValue); + break; + case "hostname": + route.hostname = Query.prepare(routingConfigValue); + break; + case "port": + route.port = Query.prepare(routingConfigValue); + break; + case "secure": + route.secure = Query.prepare(routingConfigValue); + break; + } + } catch (AttributeExpressionLanguageParsingException e) { + throw new IllegalArgumentException(format("Failed to parse NiFi expression language configured" + + " for Site-to-Site routing property at '%s' due to '%s'", propertyKey, e.getMessage()), e); + } + }); + return route; + }).map(Route::validate).collect(Collectors.groupingBy(r -> r.protocol)); + + } + + private void addVariables(Map<String, String> map, String prefix, PeerDescription peer) { + map.put(format("%s.hostname", prefix), peer.getHostname()); + map.put(format("%s.port", prefix), String.valueOf(peer.getPort())); + map.put(format("%s.secure", prefix), String.valueOf(peer.isSecure())); + } + + public boolean isModificationNeeded(final SiteToSiteTransportProtocol protocol) { + return routes != null && routes.containsKey(protocol) && !routes.get(protocol).isEmpty(); + } + + /** + * Modifies target peer description so that subsequent request can go through the appropriate route + * @param source The source peer from which a request was sent, this can be any server host participated to relay the request, + * but should be the one which can contribute to derive the correct target peer. + * @param target The original target which should receive and process further incoming requests. + * @param protocol The S2S protocol being used. + * @param requestType The requested API type. + * @param variables Containing context variables those can be referred from Expression Language. + * @return A peer description. The original target peer can be returned if there is no intermediate peer such as reverse proxies needed. + */ + public PeerDescription modify(final PeerDescription source, final PeerDescription target, + final SiteToSiteTransportProtocol protocol, final RequestType requestType, + final Map<String, String> variables) { + + addVariables(variables, "s2s.source", source); + addVariables(variables, "s2s.target", target); + variables.put("s2s.protocol", protocol.name()); + variables.put("s2s.request", requestType.name()); + + logger.debug("Modifying PeerDescription, variables={}", variables); + + return routes.get(protocol).stream().filter(r -> r.predicate == null + || Boolean.valueOf(r.predicate.evaluateExpressions(variables, null))) + .map(r -> { + final PeerDescription t = r.getTarget(variables); + logger.debug("Route definition {} matched, {}", r.name, t); + return t; + }) + // If a matched route was found, use it, else use the original target. + .findFirst().orElse(target); + + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1913b1e2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java index 2fae669..07c5920 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java @@ -64,6 +64,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener { private final NodeInformant nodeInformant; private final AtomicReference<ProcessGroup> rootGroup = new AtomicReference<>(); private final NiFiProperties nifiProperties; + private final PeerDescriptionModifier peerDescriptionModifier; private final AtomicBoolean stopped = new AtomicBoolean(false); @@ -78,6 +79,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener { this.sslContext = sslContext; this.nifiProperties = nifiProperties; this.nodeInformant = nodeInformant; + peerDescriptionModifier = new PeerDescriptionModifier(nifiProperties); } @Override @@ -218,6 +220,9 @@ public class SocketRemoteSiteListener implements RemoteSiteListener { protocol = RemoteResourceFactory.receiveServerProtocolNegotiation(dis, dos); protocol.setRootProcessGroup(rootGroup.get()); protocol.setNodeInformant(nodeInformant); + if (protocol instanceof PeerDescriptionModifiable) { + ((PeerDescriptionModifiable)protocol).setPeerDescriptionModifier(peerDescriptionModifier); + } final PeerDescription description = new PeerDescription(clientHostName, clientPort, sslContext != null); peer = new Peer(description, commsSession, peerUri, "nifi://localhost:" + getPort()); http://git-wip-us.apache.org/repos/asf/nifi/blob/1913b1e2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java index a7c0212..cb0746e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java @@ -26,6 +26,9 @@ import java.util.List; import java.util.Map; import java.util.Optional; import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.PeerDescription; +import org.apache.nifi.remote.PeerDescriptionModifiable; +import org.apache.nifi.remote.PeerDescriptionModifier; import org.apache.nifi.remote.RemoteResourceFactory; import org.apache.nifi.remote.StandardVersionNegotiator; import org.apache.nifi.remote.VersionNegotiator; @@ -39,14 +42,22 @@ import org.apache.nifi.remote.protocol.CommunicationsSession; import org.apache.nifi.remote.protocol.HandshakeProperties; import org.apache.nifi.remote.protocol.RequestType; import org.apache.nifi.remote.protocol.ResponseCode; +import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; -public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol { +public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol implements PeerDescriptionModifiable { public static final String RESOURCE_NAME = "SocketFlowFileProtocol"; // Version 6 added to support Zero-Master Clustering, which was introduced in NiFi 1.0.0 private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(6, 5, 4, 3, 2, 1); + private PeerDescriptionModifier peerDescriptionModifier; + + @Override + public void setPeerDescriptionModifier(PeerDescriptionModifier modifier) { + peerDescriptionModifier = modifier; + } + @Override protected HandshakeProperties doHandshake(Peer peer) throws IOException, HandshakeException { @@ -189,9 +200,21 @@ public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol continue; } - dos.writeUTF(nodeInfo.getSiteToSiteHostname()); - dos.writeInt(nodeInfo.getSiteToSitePort()); - dos.writeBoolean(nodeInfo.isSiteToSiteSecure()); + if (peerDescriptionModifier != null && peerDescriptionModifier.isModificationNeeded(SiteToSiteTransportProtocol.RAW)) { + final PeerDescription target = new PeerDescription(nodeInfo.getSiteToSiteHostname(), nodeInfo.getSiteToSitePort(), nodeInfo.isSiteToSiteSecure()); + final PeerDescription modifiedTarget = peerDescriptionModifier.modify(peer.getDescription(), target, + SiteToSiteTransportProtocol.RAW, PeerDescriptionModifier.RequestType.Peers, new HashMap<>()); + + dos.writeUTF(modifiedTarget.getHostname()); + dos.writeInt(modifiedTarget.getPort()); + dos.writeBoolean(modifiedTarget.isSecure()); + + } else { + dos.writeUTF(nodeInfo.getSiteToSiteHostname()); + dos.writeInt(nodeInfo.getSiteToSitePort()); + dos.writeBoolean(nodeInfo.isSiteToSiteSecure()); + } + dos.writeInt(nodeInfo.getTotalFlowFiles()); } http://git-wip-us.apache.org/repos/asf/nifi/blob/1913b1e2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestPeerDescriptionModifier.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestPeerDescriptionModifier.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestPeerDescriptionModifier.java new file mode 100644 index 0000000..cf6639f --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestPeerDescriptionModifier.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException; +import org.apache.nifi.properties.StandardNiFiProperties; +import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; +import org.apache.nifi.util.NiFiProperties; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TestPeerDescriptionModifier { + + @Test + public void testNoConfiguration() { + Properties props = new Properties(); + final NiFiProperties properties = new StandardNiFiProperties(props); + final PeerDescriptionModifier modifier = new PeerDescriptionModifier(properties); + assertFalse(modifier.isModificationNeeded(SiteToSiteTransportProtocol.RAW)); + assertFalse(modifier.isModificationNeeded(SiteToSiteTransportProtocol.HTTP)); + } + + @Test + public void testInvalidNoHostname() { + Properties props = new Properties(); + props.put("nifi.remote.route.raw.no-host.when", "true"); + final NiFiProperties properties = new StandardNiFiProperties(props); + try { + new PeerDescriptionModifier(properties); + fail("Should throw an Exception"); + } catch (IllegalArgumentException e) { + assertEquals("Found an invalid Site-to-Site route definition [no-host] 'hostname' is not specified.", e.getMessage()); + } + } + + @Test + public void testInvalidNoPort() { + Properties props = new Properties(); + props.put("nifi.remote.route.raw.no-port.when", "true"); + props.put("nifi.remote.route.raw.no-port.hostname", "proxy.example.com"); + final NiFiProperties properties = new StandardNiFiProperties(props); + try { + new PeerDescriptionModifier(properties); + fail("Should throw an Exception"); + } catch (IllegalArgumentException e) { + assertEquals("Found an invalid Site-to-Site route definition [no-port] 'port' is not specified.", e.getMessage()); + } + } + + @Test + public void testInvalidConfigurationName() { + Properties props = new Properties(); + props.put("nifi.remote.route.raw.invalid-name.when", "true"); + props.put("nifi.remote.route.raw.invalid-name.hostname", "proxy.example.com"); + props.put("nifi.remote.route.raw.invalid-name.port", "8081"); + props.put("nifi.remote.route.raw.invalid-name.secure", "true"); + props.put("nifi.remote.route.raw.invalid-name.unsupported", "true"); + final NiFiProperties properties = new StandardNiFiProperties(props); + try { + new PeerDescriptionModifier(properties); + fail("Should throw an Exception"); + } catch (IllegalArgumentException e) { + assertEquals("Found an invalid Site-to-Site route definition property 'nifi.remote.route.raw.invalid-name.unsupported'." + + " Routing property keys should be formatted as 'nifi.remote.route.{protocol}.{name}.{routingConfigName}'." + + " Where {protocol} is 'raw' or 'http', and {routingConfigName} is 'when', 'hostname', 'port' or 'secure'.", e.getMessage()); + } + } + + @Test + public void testInvalidPropertyKeyNoProtocol() { + Properties props = new Properties(); + props.put("nifi.remote.route.", "true"); + final NiFiProperties properties = new StandardNiFiProperties(props); + try { + new PeerDescriptionModifier(properties); + fail("Should throw an Exception"); + } catch (IllegalArgumentException e) { + assertEquals("Found an invalid Site-to-Site route definition property 'nifi.remote.route.'." + + " Routing property keys should be formatted as 'nifi.remote.route.{protocol}.{name}.{routingConfigName}'." + + " Where {protocol} is 'raw' or 'http', and {routingConfigName} is 'when', 'hostname', 'port' or 'secure'.", e.getMessage()); + } + } + + @Test + public void testInvalidPropertyKeyNoName() { + Properties props = new Properties(); + props.put("nifi.remote.route.http.", "true"); + final NiFiProperties properties = new StandardNiFiProperties(props); + try { + new PeerDescriptionModifier(properties); + fail("Should throw an Exception"); + } catch (IllegalArgumentException e) { + assertEquals("Found an invalid Site-to-Site route definition property 'nifi.remote.route.http.'." + + " Routing property keys should be formatted as 'nifi.remote.route.{protocol}.{name}.{routingConfigName}'." + + " Where {protocol} is 'raw' or 'http', and {routingConfigName} is 'when', 'hostname', 'port' or 'secure'.", e.getMessage()); + } + } + + @Test + public void testInvalidExpression() { + Properties props = new Properties(); + props.put("nifi.remote.route.raw.invalid-el.when", "${nonExistingFunction()}"); + props.put("nifi.remote.route.raw.invalid-el.hostname", "proxy.example.com"); + props.put("nifi.remote.route.raw.invalid-el.port", "8081"); + final NiFiProperties properties = new StandardNiFiProperties(props); + final PeerDescriptionModifier modifier = new PeerDescriptionModifier(properties); + + final PeerDescription source = new PeerDescription("client", 12345, true); + final PeerDescription target = new PeerDescription("nifi0", 8081, true); + + try { + modifier.modify(source, target, + SiteToSiteTransportProtocol.RAW, PeerDescriptionModifier.RequestType.Peers, new HashMap<>()); + fail("Should throw an Exception"); + } catch (AttributeExpressionLanguageException e) { + assertTrue(e.getMessage().startsWith("Invalid Expression")); + } + } + + @Test + public void testDefaultIsNotSecure() { + Properties props = new Properties(); + props.put("nifi.remote.route.raw.no-port.when", "true"); + props.put("nifi.remote.route.raw.no-port.hostname", "proxy.example.com"); + props.put("nifi.remote.route.raw.no-port.port", "8443"); + final NiFiProperties properties = new StandardNiFiProperties(props); + final PeerDescriptionModifier modifier = new PeerDescriptionModifier(properties); + + final PeerDescription source = new PeerDescription("client", 12345, true); + final PeerDescription target = new PeerDescription("nifi0", 8081, true); + final PeerDescription modifiedTarget = modifier.modify(source, target, + SiteToSiteTransportProtocol.RAW, PeerDescriptionModifier.RequestType.Peers, new HashMap<>()); + assertFalse(modifiedTarget.isSecure()); + } + + @Test + public void testRawPortToNode() { + Properties props = new Properties(); + + // RAW S2S route configs. + // Port number to Node + // proxy1.example.com:17491 -> nifi0:8081 + // proxy1.example.com:17492 -> nifi1:8081 + props.put("nifi.remote.route.raw.port-to-node.when", "${X-ProxyHost:equals('proxy1.example.com')" + + ":or(${s2s.source.hostname:equals('proxy1.example.com')})}"); + props.put("nifi.remote.route.raw.port-to-node.hostname", "proxy1.example.com"); + props.put("nifi.remote.route.raw.port-to-node.port", + "${s2s.target.hostname:equals('nifi0'):ifElse('17491'," + + "${s2s.target.hostname:equals('nifi1'):ifElse('17492', 'undefined')})}"); + props.put("nifi.remote.route.raw.port-to-node.secure", "true"); + + // Other S2S configs. + props.put("nifi.remote.input.host", "node0"); + props.put("nifi.remote.input.secure", "true"); + props.put("nifi.remote.input.socket.port", "8081"); + props.put("nifi.remote.input.http.enabled", "true"); + + final NiFiProperties properties = new StandardNiFiProperties(props); + final PeerDescriptionModifier modifier = new PeerDescriptionModifier(properties); + + // For requests coming from the proxy server, modify target description, + // so that client can send further request to the proxy. + // To nifi0. + PeerDescription source = new PeerDescription("proxy1.example.com", 12345, true); + PeerDescription target = new PeerDescription("nifi0", 8081, true); + PeerDescription modifiedTarget = modifier.modify(source, target, SiteToSiteTransportProtocol.RAW, PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>()); + + assertNotNull(modifiedTarget); + assertEquals("proxy1.example.com", modifiedTarget.getHostname()); + assertEquals(17491, modifiedTarget.getPort()); + assertEquals(true, modifiedTarget.isSecure()); + + // To nifi1. + target = new PeerDescription("nifi1", 8081, true); + modifiedTarget = modifier.modify(source, target, SiteToSiteTransportProtocol.RAW, PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>()); + + assertNotNull(modifiedTarget); + assertEquals("proxy1.example.com", modifiedTarget.getHostname()); + assertEquals(17492, modifiedTarget.getPort()); + assertEquals(true, modifiedTarget.isSecure()); + + // For requests coming directly, use the original target description. + source = new PeerDescription("192.168.1.101", 23456, true); + modifiedTarget = modifier.modify(source, target, SiteToSiteTransportProtocol.RAW, PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>()); + assertNotNull(modifiedTarget); + assertEquals(target, modifiedTarget); + + } + + @Test + public void testRawServerNameToNode() { + Properties props = new Properties(); + + // RAW S2S route configs. + // Server name to Node + // nifi0.example.com:17491 -> nifi0:8081 + // nifi1.example.com:17491 -> nifi1:8081 + props.put("nifi.remote.route.raw.name-to-node.when", "${X-ProxyHost:contains('.example.com')" + + ":or(${s2s.source.hostname:contains('.example.com')})}"); + props.put("nifi.remote.route.raw.name-to-node.hostname", "${s2s.target.hostname}.example.com"); + props.put("nifi.remote.route.raw.name-to-node.port", "17491"); + props.put("nifi.remote.route.raw.name-to-node.secure", "true"); + + // Other S2S configs. + props.put("nifi.remote.input.host", "node0"); + props.put("nifi.remote.input.secure", "true"); + props.put("nifi.remote.input.socket.port", "8081"); + props.put("nifi.remote.input.http.enabled", "true"); + + final NiFiProperties properties = new StandardNiFiProperties(props); + final PeerDescriptionModifier modifier = new PeerDescriptionModifier(properties); + + // For requests coming from the proxy server, modify target description, + // so that client can send further request to the proxy. + // To nifi0. + PeerDescription source = new PeerDescription("nifi0.example.com", 12345, true); + PeerDescription target = new PeerDescription("nifi0", 8081, true); + PeerDescription modifiedTarget = modifier.modify(source, target, SiteToSiteTransportProtocol.RAW, PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>()); + + assertNotNull(modifiedTarget); + assertEquals("nifi0.example.com", modifiedTarget.getHostname()); + assertEquals(17491, modifiedTarget.getPort()); + assertEquals(true, modifiedTarget.isSecure()); + + // To nifi1. + target = new PeerDescription("nifi1", 8081, true); + modifiedTarget = modifier.modify(source, target, SiteToSiteTransportProtocol.RAW, PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>()); + + assertNotNull(modifiedTarget); + assertEquals("nifi1.example.com", modifiedTarget.getHostname()); + assertEquals(17491, modifiedTarget.getPort()); + assertEquals(true, modifiedTarget.isSecure()); + + // For requests coming directly, use the original target description. + source = new PeerDescription("192.168.1.101", 23456, true); + modifiedTarget = modifier.modify(source, target, SiteToSiteTransportProtocol.RAW, PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>()); + assertNotNull(modifiedTarget); + assertEquals(target, modifiedTarget); + + } + + @Test + public void testHttpsTerminate() { + Properties props = new Properties(); + + // https://nifi0.example.com -> http://nifi0:8080 + // https://nifi1.example.com -> http://nifi1:8080 + // S2S HTTP configs. + props.put("nifi.remote.route.http.terminate.when", "${X-ProxyHost:contains('.example.com')" + + ":or(${s2s.source.hostname:contains('.example.com')})}"); + props.put("nifi.remote.route.http.terminate.hostname", "${s2s.target.hostname}.example.com"); + props.put("nifi.remote.route.http.terminate.port", "443"); + props.put("nifi.remote.route.http.terminate.secure", "true"); + + // Other S2S configs. + props.put("nifi.web.http.host", "nifi0"); + props.put("nifi.web.http.port", "8080"); + props.put("nifi.remote.input.host", "nifi0"); + props.put("nifi.remote.input.secure", "false"); + props.put("nifi.remote.input.socket.port", ""); + props.put("nifi.remote.input.http.enabled", "true"); + + + final NiFiProperties properties = new StandardNiFiProperties(props); + final PeerDescriptionModifier modifier = new PeerDescriptionModifier(properties); + + // For requests coming from the proxy server, modify target description, + // so that client can send further request to the proxy. + // To nifi0. + PeerDescription source = new PeerDescription("nifi0.example.com", 12345, true); + PeerDescription target = new PeerDescription("nifi0", 8080, false); + final Map<String, String> proxyHeders = new HashMap<>(); + proxyHeders.put("X-ProxyHost", "nifi0.example.com:443"); + proxyHeders.put("X-Forwarded-For", "172.16.1.103"); + PeerDescription modifiedTarget = modifier.modify(source, target, SiteToSiteTransportProtocol.HTTP, PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>(proxyHeders)); + + assertNotNull(modifiedTarget); + assertEquals("nifi0.example.com", modifiedTarget.getHostname()); + assertEquals(443, modifiedTarget.getPort()); + assertEquals(true, modifiedTarget.isSecure()); + + // To nifi1. + proxyHeders.put("X-ProxyHost", "nifi1.example.com:443"); + target = new PeerDescription("nifi1", 8081, true); + modifiedTarget = modifier.modify(source, target, SiteToSiteTransportProtocol.HTTP, PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>(proxyHeders)); + + assertNotNull(modifiedTarget); + assertEquals("nifi1.example.com", modifiedTarget.getHostname()); + assertEquals(443, modifiedTarget.getPort()); + assertEquals(true, modifiedTarget.isSecure()); + + // For requests coming directly, use the original target description. + source = new PeerDescription("192.168.1.101", 23456, true); + modifiedTarget = modifier.modify(source, target, SiteToSiteTransportProtocol.HTTP, PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>()); + assertNotNull(modifiedTarget); + assertEquals(target, modifiedTarget); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1913b1e2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java index db0a568..85423c0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java @@ -137,6 +137,11 @@ public abstract class ApplicationResource { * @return resource uri */ protected String generateResourceUri(final String... path) { + URI uri = buildResourceUri(path); + return uri.toString(); + } + + private URI buildResourceUri(final String... path) { final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder(); uriBuilder.segment(path); URI uri = uriBuilder.build(); @@ -179,7 +184,7 @@ public abstract class ApplicationResource { } catch (final URISyntaxException use) { throw new UriBuilderException(use); } - return uri.toString(); + return uri; } /** @@ -1226,9 +1231,9 @@ public abstract class ApplicationResource { public Response locationResponse(UriInfo uriInfo, String portType, String portId, String transactionId, Object entity, Integer protocolVersion, final HttpRemoteSiteListener transactionManager) { - String path = "/data-transfer/" + portType + "/" + portId + "/transactions/" + transactionId; - URI location = uriInfo.getBaseUriBuilder().path(path).build(); - return noCache(setCommonHeaders(Response.created(location), protocolVersion, transactionManager) + final URI transactionUri = buildResourceUri("data-transfer", portType, portId, "transactions", transactionId); + + return noCache(setCommonHeaders(Response.created(transactionUri), protocolVersion, transactionManager) .header(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE)) .entity(entity).build(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/1913b1e2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java index bf55c05..1afedad 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java @@ -48,7 +48,6 @@ import org.apache.nifi.remote.protocol.HandshakeProperty; import org.apache.nifi.remote.protocol.ResponseCode; import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocol; import org.apache.nifi.remote.protocol.http.StandardHttpFlowFileServerProtocol; -import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.api.entity.TransactionResultEntity; @@ -74,6 +73,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.StreamingOutput; import javax.ws.rs.core.UriInfo; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; http://git-wip-us.apache.org/repos/asf/nifi/blob/1913b1e2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java index 36cfda1..6737936 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java @@ -30,9 +30,12 @@ import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.node.NodeWorkload; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.remote.HttpRemoteSiteListener; +import org.apache.nifi.remote.PeerDescription; +import org.apache.nifi.remote.PeerDescriptionModifier; import org.apache.nifi.remote.VersionNegotiator; import org.apache.nifi.remote.client.http.TransportProtocolVersionNegotiator; import org.apache.nifi.remote.exception.BadRequestException; +import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.remote.protocol.http.HttpHeaders; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.NiFiServiceFacade; @@ -56,6 +59,8 @@ import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Enumeration; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -80,9 +85,11 @@ public class SiteToSiteResource extends ApplicationResource { private final ResponseCreator responseCreator = new ResponseCreator(); private final VersionNegotiator transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(1); private final HttpRemoteSiteListener transactionManager; + private final PeerDescriptionModifier peerDescriptionModifier; public SiteToSiteResource(final NiFiProperties nifiProperties) { transactionManager = HttpRemoteSiteListener.getInstance(nifiProperties); + peerDescriptionModifier = new PeerDescriptionModifier(nifiProperties); } /** @@ -131,6 +138,34 @@ public class SiteToSiteResource extends ApplicationResource { // get the controller dto final ControllerDTO controller = serviceFacade.getSiteToSiteDetails(); + // Alter s2s port. + final boolean modificationNeededRaw = peerDescriptionModifier.isModificationNeeded(SiteToSiteTransportProtocol.RAW); + final boolean modificationNeededHttp = peerDescriptionModifier.isModificationNeeded(SiteToSiteTransportProtocol.HTTP); + if (modificationNeededRaw || modificationNeededHttp) { + final PeerDescription source = getSourcePeerDescription(req); + final Boolean isSiteToSiteSecure = controller.isSiteToSiteSecure(); + final String siteToSiteHostname = getSiteToSiteHostname(req); + final Map<String, String> httpHeaders = getHttpHeaders(req); + + if (modificationNeededRaw) { + final PeerDescription rawTarget = new PeerDescription(siteToSiteHostname, controller.getRemoteSiteListeningPort(), isSiteToSiteSecure); + final PeerDescription modifiedRawTarget = peerDescriptionModifier.modify(source, rawTarget, + SiteToSiteTransportProtocol.RAW, PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>(httpHeaders)); + controller.setRemoteSiteListeningPort(modifiedRawTarget.getPort()); + } + + if (modificationNeededHttp) { + final PeerDescription httpTarget = new PeerDescription(siteToSiteHostname, controller.getRemoteSiteHttpListeningPort(), isSiteToSiteSecure); + final PeerDescription modifiedHttpTarget = peerDescriptionModifier.modify(source, httpTarget, + SiteToSiteTransportProtocol.HTTP, PeerDescriptionModifier.RequestType.SiteToSiteDetail, new HashMap<>(httpHeaders)); + controller.setRemoteSiteHttpListeningPort(modifiedHttpTarget.getPort()); + if (!controller.isSiteToSiteSecure() && modifiedHttpTarget.isSecure()) { + // In order to enable TLS terminate at the reverse proxy server, even if NiFi itself is not secured, introduce the endpoint as secure. + controller.setSiteToSiteSecure(true); + } + } + } + // build the response entity final ControllerEntity entity = new ControllerEntity(); entity.setController(controller); @@ -147,6 +182,20 @@ public class SiteToSiteResource extends ApplicationResource { return noCache(Response.ok(entity)).build(); } + private PeerDescription getSourcePeerDescription(@Context HttpServletRequest req) { + return new PeerDescription(req.getRemoteHost(), req.getRemotePort(), req.isSecure()); + } + + private Map<String, String> getHttpHeaders(@Context HttpServletRequest req) { + final Map<String, String> headers = new HashMap<>(); + final Enumeration<String> headerNames = req.getHeaderNames(); + while (headerNames.hasMoreElements()) { + final String name = headerNames.nextElement(); + headers.put(name, req.getHeader(name)); + } + return headers; + } + /** * Returns the available Peers and its status of this NiFi. * @@ -187,18 +236,29 @@ public class SiteToSiteResource extends ApplicationResource { } final List<PeerDTO> peers = new ArrayList<>(); + final PeerDescription source = getSourcePeerDescription(req); + final boolean modificationNeeded = peerDescriptionModifier.isModificationNeeded(SiteToSiteTransportProtocol.HTTP); + final Map<String, String> headers = modificationNeeded ? getHttpHeaders(req) : null; if (properties.isNode()) { try { final Map<NodeIdentifier, NodeWorkload> clusterWorkload = clusterCoordinator.getClusterWorkload(); - clusterWorkload.entrySet().stream().forEach(entry -> { + clusterWorkload.forEach((nodeId, workload) -> { + final String siteToSiteHostname = nodeId.getSiteToSiteAddress() == null ? nodeId.getApiAddress() : nodeId.getSiteToSiteAddress(); + final int siteToSitePort = nodeId.getSiteToSiteHttpApiPort() == null ? nodeId.getApiPort() : nodeId.getSiteToSiteHttpApiPort(); + + PeerDescription target = new PeerDescription(siteToSiteHostname, siteToSitePort, nodeId.isSiteToSiteSecure()); + + if (modificationNeeded) { + target = peerDescriptionModifier.modify(source, target, + SiteToSiteTransportProtocol.HTTP, PeerDescriptionModifier.RequestType.Peers, new HashMap<>(headers)); + } + final PeerDTO peer = new PeerDTO(); - final NodeIdentifier nodeId = entry.getKey(); - final String siteToSiteAddress = nodeId.getSiteToSiteAddress(); - peer.setHostname(siteToSiteAddress == null ? nodeId.getApiAddress() : siteToSiteAddress); - peer.setPort(nodeId.getSiteToSiteHttpApiPort() == null ? nodeId.getApiPort() : nodeId.getSiteToSiteHttpApiPort()); - peer.setSecure(nodeId.isSiteToSiteSecure()); - peer.setFlowFileCount(entry.getValue().getFlowFileCount()); + peer.setHostname(target.getHostname()); + peer.setPort(target.getPort()); + peer.setSecure(target.isSecure()); + peer.setFlowFileCount(workload.getFlowFileCount()); peers.add(peer); }); } catch (IOException e) { @@ -208,24 +268,20 @@ public class SiteToSiteResource extends ApplicationResource { } else { // Standalone mode. final PeerDTO peer = new PeerDTO(); + final String siteToSiteHostname = getSiteToSiteHostname(req); - // Private IP address or hostname may not be accessible from client in some environments. - // So, use the value defined in nifi.properties instead when it is defined. - final String remoteInputHost = properties.getRemoteInputHost(); - String localName; - try { - // Get local host name using InetAddress if available, same as RAW socket does. - localName = InetAddress.getLocalHost().getHostName(); - } catch (UnknownHostException e) { - if (logger.isDebugEnabled()) { - logger.debug("Failed to get local host name using InetAddress.", e); - } - localName = req.getLocalName(); + + PeerDescription target = new PeerDescription(siteToSiteHostname, + properties.getRemoteInputHttpPort(), properties.isSiteToSiteSecure()); + + if (modificationNeeded) { + target = peerDescriptionModifier.modify(source, target, + SiteToSiteTransportProtocol.HTTP, PeerDescriptionModifier.RequestType.Peers, new HashMap<>(headers)); } - peer.setHostname(isEmpty(remoteInputHost) ? localName : remoteInputHost); - peer.setPort(properties.getRemoteInputHttpPort()); - peer.setSecure(properties.isSiteToSiteSecure()); + peer.setHostname(target.getHostname()); + peer.setPort(target.getPort()); + peer.setSecure(target.isSecure()); peer.setFlowFileCount(0); // doesn't matter how many FlowFiles we have, because we're the only host. peers.add(peer); @@ -237,6 +293,24 @@ public class SiteToSiteResource extends ApplicationResource { return noCache(setCommonHeaders(Response.ok(entity), transportProtocolVersion, transactionManager)).build(); } + private String getSiteToSiteHostname(final HttpServletRequest req) { + // Private IP address or hostname may not be accessible from client in some environments. + // So, use the value defined in nifi.properties instead when it is defined. + final String remoteInputHost = properties.getRemoteInputHost(); + String localName; + try { + // Get local host name using InetAddress if available, same as RAW socket does. + localName = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + if (logger.isDebugEnabled()) { + logger.debug("Failed to get local host name using InetAddress.", e); + } + localName = req.getLocalName(); + } + + return isEmpty(remoteInputHost) ? localName : remoteInputHost; + } + // setters public void setServiceFacade(final NiFiServiceFacade serviceFacade) { http://git-wip-us.apache.org/repos/asf/nifi/blob/1913b1e2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestDataTransferResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestDataTransferResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestDataTransferResource.java index 3dac9ce..d16a8a6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestDataTransferResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestDataTransferResource.java @@ -41,10 +41,14 @@ import javax.ws.rs.core.StreamingOutput; import javax.ws.rs.core.UriBuilder; import javax.ws.rs.core.UriInfo; import java.io.InputStream; +import java.lang.reflect.Field; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; +import static org.apache.nifi.web.api.ApplicationResource.PROXY_HOST_HTTP_HEADER; +import static org.apache.nifi.web.api.ApplicationResource.PROXY_PORT_HTTP_HEADER; +import static org.apache.nifi.web.api.ApplicationResource.PROXY_SCHEME_HTTP_HEADER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -53,6 +57,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TestDataTransferResource { @@ -156,6 +161,17 @@ public class TestDataTransferResource { final ServletContext context = null; final UriInfo uriInfo = mockUriInfo(locationUriStr); + final Field uriInfoField = resource.getClass().getSuperclass().getSuperclass() + .getDeclaredField("uriInfo"); + uriInfoField.setAccessible(true); + uriInfoField.set(resource, uriInfo); + + final HttpServletRequest request = mock(HttpServletRequest.class); + final Field httpServletRequestField = resource.getClass().getSuperclass().getSuperclass() + .getDeclaredField("httpServletRequest"); + httpServletRequestField.setAccessible(true); + httpServletRequestField.set(resource, request); + final InputStream inputStream = null; final Response response = resource.createPortTransaction("input-ports", "port-id", req, context, uriInfo, inputStream); @@ -168,6 +184,41 @@ public class TestDataTransferResource { } @Test + public void testCreateTransactionThroughReverseProxy() throws Exception { + final HttpServletRequest req = createCommonHttpServletRequest(); + + final DataTransferResource resource = getDataTransferResource(); + + final String locationUriStr = "https://nifi2.example.com:443/nifi-api/data-transfer/input-ports/port-id/transactions/transaction-id"; + + final ServletContext context = null; + final UriInfo uriInfo = mockUriInfo(locationUriStr); + final Field uriInfoField = resource.getClass().getSuperclass().getSuperclass() + .getDeclaredField("uriInfo"); + uriInfoField.setAccessible(true); + uriInfoField.set(resource, uriInfo); + + final HttpServletRequest request = mock(HttpServletRequest.class); + when(request.getHeader(PROXY_SCHEME_HTTP_HEADER)).thenReturn("https"); + when(request.getHeader(PROXY_HOST_HTTP_HEADER)).thenReturn("nifi2.example.com"); + when(request.getHeader(PROXY_PORT_HTTP_HEADER)).thenReturn("443"); + final Field httpServletRequestField = resource.getClass().getSuperclass().getSuperclass() + .getDeclaredField("httpServletRequest"); + httpServletRequestField.setAccessible(true); + httpServletRequestField.set(resource, request); + + final InputStream inputStream = null; + + final Response response = resource.createPortTransaction("input-ports", "port-id", req, context, uriInfo, inputStream); + + TransactionResultEntity resultEntity = (TransactionResultEntity) response.getEntity(); + + assertEquals(201, response.getStatus()); + assertEquals(ResponseCode.PROPERTIES_OK.getCode(), resultEntity.getResponseCode()); + assertEquals(locationUriStr, response.getMetadata().getFirst(HttpHeaders.LOCATION_HEADER_NAME).toString()); + } + + @Test public void testExtendTransaction() throws Exception { final HttpServletRequest req = createCommonHttpServletRequest();
