KNOX-970 - Add support for proxying NiFi (Jeff Storck via lmccay) Project: http://git-wip-us.apache.org/repos/asf/knox/repo Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/89dd7788 Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/89dd7788 Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/89dd7788
Branch: refs/heads/master Commit: 89dd77886e7f9990e2b5ac2a78012c0d8dfc7cbd Parents: 951a725 Author: Larry McCay <lmc...@hortonworks.com> Authored: Mon Nov 6 17:45:48 2017 -0500 Committer: Larry McCay <lmc...@hortonworks.com> Committed: Mon Nov 6 17:59:49 2017 -0500 ---------------------------------------------------------------------- gateway-release/pom.xml | 4 + .../ServiceDefinitionDeploymentContributor.java | 14 ++- .../service/definition/CustomDispatch.java | 11 ++ .../resources/services/nifi/1.4.0/rewrite.xml | 27 +++++ .../resources/services/nifi/1.4.0/service.xml | 30 +++++ gateway-service-nifi/pom.xml | 38 +++++++ .../hadoop/gateway/dispatch/NiFiDispatch.java | 106 ++++++++++++++++++ .../hadoop/gateway/dispatch/NiFiHaDispatch.java | 111 +++++++++++++++++++ .../hadoop/gateway/dispatch/NiFiHeaders.java | 26 +++++ .../gateway/dispatch/NiFiRequestUtil.java | 89 +++++++++++++++ .../gateway/dispatch/NiFiResponseUtil.java | 89 +++++++++++++++ gateway-spi/pom.xml | 4 + .../dispatch/DefaultHttpClientFactory.java | 55 +++++++-- pom.xml | 6 + 14 files changed, 598 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/knox/blob/89dd7788/gateway-release/pom.xml ---------------------------------------------------------------------- diff --git a/gateway-release/pom.xml b/gateway-release/pom.xml index ad07225..cf85f8b 100644 --- a/gateway-release/pom.xml +++ b/gateway-release/pom.xml @@ -216,6 +216,10 @@ </dependency> <dependency> <groupId>${gateway-group}</groupId> + <artifactId>gateway-service-nifi</artifactId> + </dependency> + <dependency> + <groupId>${gateway-group}</groupId> <artifactId>gateway-provider-rewrite</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/knox/blob/89dd7788/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/ServiceDefinitionDeploymentContributor.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/ServiceDefinitionDeploymentContributor.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/ServiceDefinitionDeploymentContributor.java index 53a7e23..8c0e7eb 100644 --- a/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/ServiceDefinitionDeploymentContributor.java +++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/ServiceDefinitionDeploymentContributor.java @@ -187,11 +187,12 @@ public class ServiceDefinitionDeploymentContributor extends ServiceDeploymentCon String haContributorName = customDispatch.getHaContributorName(); String haClassName = customDispatch.getHaClassName(); String httpClientFactory = customDispatch.getHttpClientFactory(); + boolean useTwoWaySsl = customDispatch.getUseTwoWaySsl(); if ( isHaEnabled) { if (haContributorName != null) { addDispatchFilter(context, service, resource, DISPATCH_ROLE, haContributorName); } else if (haClassName != null) { - addDispatchFilterForClass(context, service, resource, haClassName, httpClientFactory); + addDispatchFilterForClass(context, service, resource, haClassName, httpClientFactory, useTwoWaySsl); } else { addDefaultHaDispatchFilter(context, service, resource); } @@ -202,7 +203,7 @@ public class ServiceDefinitionDeploymentContributor extends ServiceDeploymentCon } else { String className = customDispatch.getClassName(); if ( className != null ) { - addDispatchFilterForClass(context, service, resource, className, httpClientFactory); + addDispatchFilterForClass(context, service, resource, className, httpClientFactory, useTwoWaySsl); } else { //final fallback to the default dispatch addDispatchFilter(context, service, resource, DISPATCH_ROLE, "http-client"); @@ -221,12 +222,15 @@ public class ServiceDefinitionDeploymentContributor extends ServiceDeploymentCon filter.param().name(SERVICE_ROLE_PARAM).value(service.getRole()); } - private FilterDescriptor addDispatchFilterForClass(DeploymentContext context, Service service, ResourceDescriptor resource, String dispatchClass, String httpClientFactory) { + private FilterDescriptor addDispatchFilterForClass(DeploymentContext context, Service service, ResourceDescriptor resource, String dispatchClass, String httpClientFactory, boolean useTwoWaySsl) { FilterDescriptor filter = resource.addFilter().name(getName()).role(DISPATCH_ROLE).impl(GatewayDispatchFilter.class); filter.param().name(DISPATCH_IMPL_PARAM).value(dispatchClass); if (httpClientFactory != null) { filter.param().name(HTTP_CLIENT_FACTORY_PARAM).value(httpClientFactory); } + // let's take the value of useTwoWaySsl which is derived from the service definition + // then allow it to be overridden by service params from the topology + filter.param().name("useTwoWaySsl").value(Boolean.toString(useTwoWaySsl)); for ( Map.Entry<String, String> serviceParam : service.getParams().entrySet() ) { filter.param().name(serviceParam.getKey()).value(serviceParam.getValue()); } @@ -240,6 +244,10 @@ public class ServiceDefinitionDeploymentContributor extends ServiceDeploymentCon return filter; } + private FilterDescriptor addDispatchFilterForClass(DeploymentContext context, Service service, ResourceDescriptor resource, String dispatchClass, String httpClientFactory) { + return addDispatchFilterForClass(context, service, resource, dispatchClass, httpClientFactory, false); + } + private boolean isHaEnabled(DeploymentContext context) { Provider provider = getProviderByRole(context, "ha"); if ( provider != null && provider.isEnabled() ) { http://git-wip-us.apache.org/repos/asf/knox/blob/89dd7788/gateway-service-definitions/src/main/java/org/apache/hadoop/gateway/service/definition/CustomDispatch.java ---------------------------------------------------------------------- diff --git a/gateway-service-definitions/src/main/java/org/apache/hadoop/gateway/service/definition/CustomDispatch.java b/gateway-service-definitions/src/main/java/org/apache/hadoop/gateway/service/definition/CustomDispatch.java index abde82d..7e5c2a0 100644 --- a/gateway-service-definitions/src/main/java/org/apache/hadoop/gateway/service/definition/CustomDispatch.java +++ b/gateway-service-definitions/src/main/java/org/apache/hadoop/gateway/service/definition/CustomDispatch.java @@ -33,6 +33,8 @@ public class CustomDispatch { private String httpClientFactory; + private boolean useTwoWaySsl = false; + @XmlAttribute(name = "contributor-name") public String getContributorName() { return contributorName; @@ -77,4 +79,13 @@ public class CustomDispatch { public void setHttpClientFactory(String httpClientFactory) { this.httpClientFactory = httpClientFactory; } + + @XmlAttribute(name = "use-two-way-ssl") + public boolean getUseTwoWaySsl() { + return useTwoWaySsl; + } + + public void setUseTwoWaySsl(boolean useTwoWaySsl) { + this.useTwoWaySsl = useTwoWaySsl; + } } http://git-wip-us.apache.org/repos/asf/knox/blob/89dd7788/gateway-service-definitions/src/main/resources/services/nifi/1.4.0/rewrite.xml ---------------------------------------------------------------------- diff --git a/gateway-service-definitions/src/main/resources/services/nifi/1.4.0/rewrite.xml b/gateway-service-definitions/src/main/resources/services/nifi/1.4.0/rewrite.xml new file mode 100644 index 0000000..7c44efc --- /dev/null +++ b/gateway-service-definitions/src/main/resources/services/nifi/1.4.0/rewrite.xml @@ -0,0 +1,27 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<rules> + <rule dir="IN" name="NIFI/nifi/inbound/path" pattern="*://*:*/**/nifi-app/{path=**}"> + <rewrite template="{$serviceUrl[NIFI]}/{path=**}"/> + </rule> + <rule dir="IN" name="NIFI/nifi/inbound/path/query" pattern="*://*:*/**/nifi-app/{path=**}?{**}"> + <rewrite template="{$serviceUrl[NIFI]}/{path=**}?{**}"/> + </rule> + <rule dir="IN" name="NIFI/nifi/inbound/path/query-other" pattern="*://*:*/**/nifi-app/{path=**}/?{**}"> + <rewrite template="{$serviceUrl[NIFI]}/{path=**}/?{**}"/> + </rule> +</rules> http://git-wip-us.apache.org/repos/asf/knox/blob/89dd7788/gateway-service-definitions/src/main/resources/services/nifi/1.4.0/service.xml ---------------------------------------------------------------------- diff --git a/gateway-service-definitions/src/main/resources/services/nifi/1.4.0/service.xml b/gateway-service-definitions/src/main/resources/services/nifi/1.4.0/service.xml new file mode 100644 index 0000000..fb24598 --- /dev/null +++ b/gateway-service-definitions/src/main/resources/services/nifi/1.4.0/service.xml @@ -0,0 +1,30 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<service role="NIFI" name="nifi" version="1.4.0"> + <routes> + <route path="/nifi-app/**"> + <rewrite apply="NIFI/nifi/inbound/path" to="request.url"/> + </route> + <route path="/nifi-app/**?**"> + <rewrite apply="NIFI/nifi/inbound/path/query" to="request.url"/> + </route> + <route path="/nifi-app/**/?**"> + <rewrite apply="NIFI/nifi/inbound/path/query-other" to="request.url"/> + </route> + </routes> + <dispatch classname="org.apache.hadoop.gateway.dispatch.NiFiDispatch" use-two-way-ssl="false"/> +</service> http://git-wip-us.apache.org/repos/asf/knox/blob/89dd7788/gateway-service-nifi/pom.xml ---------------------------------------------------------------------- diff --git a/gateway-service-nifi/pom.xml b/gateway-service-nifi/pom.xml new file mode 100644 index 0000000..41d50fb --- /dev/null +++ b/gateway-service-nifi/pom.xml @@ -0,0 +1,38 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>gateway</artifactId> + <groupId>org.apache.knox</groupId> + <version>0.14.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>gateway-service-nifi</artifactId> + <description>Extension to the gateway for supporting Apache NiFi.</description> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + <distribution>repo</distribution> + </license> + </licenses> + + <dependencies> + <dependency> + <groupId>${gateway-group}</groupId> + <artifactId>gateway-spi</artifactId> + </dependency> + <dependency> + <groupId>${gateway-group}</groupId> + <artifactId>gateway-provider-ha</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/knox/blob/89dd7788/gateway-service-nifi/src/main/java/org/apache/hadoop/gateway/dispatch/NiFiDispatch.java ---------------------------------------------------------------------- diff --git a/gateway-service-nifi/src/main/java/org/apache/hadoop/gateway/dispatch/NiFiDispatch.java b/gateway-service-nifi/src/main/java/org/apache/hadoop/gateway/dispatch/NiFiDispatch.java new file mode 100644 index 0000000..013fd9c --- /dev/null +++ b/gateway-service-nifi/src/main/java/org/apache/hadoop/gateway/dispatch/NiFiDispatch.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.gateway.dispatch; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Set; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.hadoop.gateway.util.MimeTypes; +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.entity.ContentType; + +public class NiFiDispatch extends DefaultDispatch { + + @Override + protected void executeRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws IOException { + outboundRequest = NiFiRequestUtil.modifyOutboundRequest(outboundRequest, inboundRequest); + HttpResponse inboundResponse = executeOutboundRequest(outboundRequest); + writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse); + } + + /** + * Overridden to provide a spot to modify the outbound response before its stream is closed. + */ + protected void writeOutboundResponse(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, HttpResponse inboundResponse) throws IOException { + // Copy the client respond header to the server respond. + outboundResponse.setStatus(inboundResponse.getStatusLine().getStatusCode()); + Header[] headers = inboundResponse.getAllHeaders(); + Set<String> excludeHeaders = getOutboundResponseExcludeHeaders(); + boolean hasExcludeHeaders = false; + if ((excludeHeaders != null) && !(excludeHeaders.isEmpty())) { + hasExcludeHeaders = true; + } + for ( Header header : headers ) { + String name = header.getName(); + if (hasExcludeHeaders && excludeHeaders.contains(name.toUpperCase())) { + continue; + } + String value = header.getValue(); + outboundResponse.addHeader(name, value); + } + + HttpEntity entity = inboundResponse.getEntity(); + if( entity != null ) { + outboundResponse.setContentType( getInboundResponseContentType( entity ) ); + InputStream stream = entity.getContent(); + try { + NiFiResponseUtil.modifyOutboundResponse(inboundRequest, outboundResponse, inboundResponse); + writeResponse( inboundRequest, outboundResponse, stream ); + } finally { + closeInboundResponse( inboundResponse, stream ); + } + } + } + + /** + * Overriden due to {@link DefaultDispatch#getInboundResponseContentType(HttpEntity) having private access, and the method is used by + * {@link #writeOutboundResponse(HttpUriRequest, HttpServletRequest, HttpServletResponse, HttpResponse)}} + */ + private String getInboundResponseContentType( final HttpEntity entity ) { + String fullContentType = null; + if( entity != null ) { + ContentType entityContentType = ContentType.get( entity ); + if( entityContentType != null ) { + if( entityContentType.getCharset() == null ) { + final String entityMimeType = entityContentType.getMimeType(); + final String defaultCharset = MimeTypes.getDefaultCharsetForMimeType( entityMimeType ); + if( defaultCharset != null ) { + LOG.usingDefaultCharsetForEntity( entityMimeType, defaultCharset ); + entityContentType = entityContentType.withCharset( defaultCharset ); + } + } else { + LOG.usingExplicitCharsetForEntity( entityContentType.getMimeType(), entityContentType.getCharset() ); + } + fullContentType = entityContentType.toString(); + } + } + if( fullContentType == null ) { + LOG.unknownResponseEntityContentType(); + } else { + LOG.inboundResponseEntityContentType( fullContentType ); + } + return fullContentType; + } +} http://git-wip-us.apache.org/repos/asf/knox/blob/89dd7788/gateway-service-nifi/src/main/java/org/apache/hadoop/gateway/dispatch/NiFiHaDispatch.java ---------------------------------------------------------------------- diff --git a/gateway-service-nifi/src/main/java/org/apache/hadoop/gateway/dispatch/NiFiHaDispatch.java b/gateway-service-nifi/src/main/java/org/apache/hadoop/gateway/dispatch/NiFiHaDispatch.java new file mode 100644 index 0000000..4272086 --- /dev/null +++ b/gateway-service-nifi/src/main/java/org/apache/hadoop/gateway/dispatch/NiFiHaDispatch.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.gateway.dispatch; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Set; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.hadoop.gateway.ha.dispatch.DefaultHaDispatch; +import org.apache.hadoop.gateway.util.MimeTypes; +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.entity.ContentType; + +public class NiFiHaDispatch extends DefaultHaDispatch { + + public NiFiHaDispatch() { + setServiceRole("NIFI"); + } + + @Override + protected void executeRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws IOException { + outboundRequest = NiFiRequestUtil.modifyOutboundRequest(outboundRequest, inboundRequest); + HttpResponse inboundResponse = executeOutboundRequest(outboundRequest); + writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse); + } + + /** + * Overridden to provide a spot to modify the outbound response before its stream is closed. + */ + protected void writeOutboundResponse(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, HttpResponse inboundResponse) throws IOException { + // Copy the client respond header to the server respond. + outboundResponse.setStatus(inboundResponse.getStatusLine().getStatusCode()); + Header[] headers = inboundResponse.getAllHeaders(); + Set<String> excludeHeaders = getOutboundResponseExcludeHeaders(); + boolean hasExcludeHeaders = false; + if ((excludeHeaders != null) && !(excludeHeaders.isEmpty())) { + hasExcludeHeaders = true; + } + for ( Header header : headers ) { + String name = header.getName(); + if (hasExcludeHeaders && excludeHeaders.contains(name.toUpperCase())) { + continue; + } + String value = header.getValue(); + outboundResponse.addHeader(name, value); + } + + HttpEntity entity = inboundResponse.getEntity(); + if( entity != null ) { + outboundResponse.setContentType( getInboundResponseContentType( entity ) ); + InputStream stream = entity.getContent(); + try { + NiFiResponseUtil.modifyOutboundResponse(inboundRequest, outboundResponse, inboundResponse); + writeResponse( inboundRequest, outboundResponse, stream ); + } finally { + closeInboundResponse( inboundResponse, stream ); + } + } + } + + /** + * Overriden due to {@link DefaultDispatch#getInboundResponseContentType(HttpEntity) having private access, and the method is used by + * {@link #writeOutboundResponse(HttpUriRequest, HttpServletRequest, HttpServletResponse, HttpResponse)}} + */ + private String getInboundResponseContentType( final HttpEntity entity ) { + String fullContentType = null; + if( entity != null ) { + ContentType entityContentType = ContentType.get( entity ); + if( entityContentType != null ) { + if( entityContentType.getCharset() == null ) { + final String entityMimeType = entityContentType.getMimeType(); + final String defaultCharset = MimeTypes.getDefaultCharsetForMimeType( entityMimeType ); + if( defaultCharset != null ) { + DefaultDispatch.LOG.usingDefaultCharsetForEntity( entityMimeType, defaultCharset ); + entityContentType = entityContentType.withCharset( defaultCharset ); + } + } else { + DefaultDispatch.LOG.usingExplicitCharsetForEntity( entityContentType.getMimeType(), entityContentType.getCharset() ); + } + fullContentType = entityContentType.toString(); + } + } + if( fullContentType == null ) { + DefaultDispatch.LOG.unknownResponseEntityContentType(); + } else { + DefaultDispatch.LOG.inboundResponseEntityContentType( fullContentType ); + } + return fullContentType; + } +} http://git-wip-us.apache.org/repos/asf/knox/blob/89dd7788/gateway-service-nifi/src/main/java/org/apache/hadoop/gateway/dispatch/NiFiHeaders.java ---------------------------------------------------------------------- diff --git a/gateway-service-nifi/src/main/java/org/apache/hadoop/gateway/dispatch/NiFiHeaders.java b/gateway-service-nifi/src/main/java/org/apache/hadoop/gateway/dispatch/NiFiHeaders.java new file mode 100644 index 0000000..f3e8e68 --- /dev/null +++ b/gateway-service-nifi/src/main/java/org/apache/hadoop/gateway/dispatch/NiFiHeaders.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.gateway.dispatch; + +class NiFiHeaders { + static final String X_FORWARDED_PROTO = "X-Forwarded-Proto"; + static final String X_FORWARDED_HOST = "X-Forwarded-Server"; + static final String X_FORWARDED_PORT = "X-Forwarded-Port"; + static final String X_FORWARDED_CONTEXT = "X-Forwarded-Context"; + static final String X_PROXIED_ENTITIES_CHAIN = "X-ProxiedEntitiesChain"; +} http://git-wip-us.apache.org/repos/asf/knox/blob/89dd7788/gateway-service-nifi/src/main/java/org/apache/hadoop/gateway/dispatch/NiFiRequestUtil.java ---------------------------------------------------------------------- diff --git a/gateway-service-nifi/src/main/java/org/apache/hadoop/gateway/dispatch/NiFiRequestUtil.java b/gateway-service-nifi/src/main/java/org/apache/hadoop/gateway/dispatch/NiFiRequestUtil.java new file mode 100644 index 0000000..9fdc425 --- /dev/null +++ b/gateway-service-nifi/src/main/java/org/apache/hadoop/gateway/dispatch/NiFiRequestUtil.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.gateway.dispatch; + +import java.io.IOException; + +import javax.security.auth.Subject; +import javax.servlet.http.HttpServletRequest; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.gateway.security.SubjectUtils; +import org.apache.http.Header; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.client.methods.RequestBuilder; +import org.apache.log4j.Logger; + +import com.google.common.base.Objects; +import com.google.common.base.Strings; + +class NiFiRequestUtil { + + static HttpUriRequest modifyOutboundRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest) throws IOException { + // preserve trailing slash from inbound request in the outbound request + if (inboundRequest.getPathInfo().endsWith("/")) { + String[] split = outboundRequest.getURI().toString().split("\\?"); + if (!split[0].endsWith("/")) { + outboundRequest = RequestBuilder.copy(outboundRequest).setUri(split[0] + "/" + (split.length == 2 ? "?" + split[1] : "")).build(); + } + } + // update the X-Forwarded-Context header to include the Knox-specific context path + final Header originalXForwardedContextHeader = outboundRequest.getFirstHeader(NiFiHeaders.X_FORWARDED_CONTEXT); + if (originalXForwardedContextHeader != null) { + String xForwardedContextHeaderValue = originalXForwardedContextHeader.getValue(); + if (!Strings.isNullOrEmpty(xForwardedContextHeaderValue)) { + // Inspect the inbound request and outbound request to determine the additional context path from the rewrite + // rules that needs to be added to the X-Forwarded-Context header to allow proper proxying to NiFi. + // + // NiFi does its own URL rewriting, and will not work with the context path provided by Knox + // (ie, "/gateway/sandbox"). + // + // For example, if Knox has a rewrite rule "*://*:*/**/nifi-app/{**}?{**}", "/nifi-app" needs to be added + // to the existing value of the X-Forwarded-Context header, which ends up being "/gateway/sandbox/nifi-app". + String inboundRequestPathInfo = inboundRequest.getPathInfo(); + String outboundRequestUriPath = outboundRequest.getURI().getPath(); + String outboundRequestUriPathNoTrailingSlash = StringUtils.removeEnd(outboundRequestUriPath, "/"); + String knoxRouteContext = null; + int index = inboundRequestPathInfo.lastIndexOf(outboundRequestUriPathNoTrailingSlash); + if (index >= 0) { + knoxRouteContext = inboundRequestPathInfo.substring(0, index); + } else { + Logger.getLogger(NiFiHaDispatch.class.getName()).error(String.format("Unable to find index of %s in %s", outboundRequestUriPathNoTrailingSlash, inboundRequestPathInfo)); + } + outboundRequest.setHeader(NiFiHeaders.X_FORWARDED_CONTEXT, xForwardedContextHeaderValue + knoxRouteContext); + } + } + + // NiFi requires the header "X-ProxiedEntitiesChain" to be set with the identity or identities of the authenticated requester. + // The effective principal (identity) in the requester subject must be added to "X-ProxiedEntitiesChain". + // If the request already has a populated "X-ProxiedEntitiesChain" header, the identities must be appended to it. + // If the user proxied through Knox is anonymous, the "Anonymous" identity needs to be represented in X-ProxiedEntitiesChain + // as empty angle brackets "<>". + final Subject subject = SubjectUtils.getCurrentSubject(); + String effectivePrincipalName = SubjectUtils.getEffectivePrincipalName(subject); + outboundRequest.setHeader(NiFiHeaders.X_PROXIED_ENTITIES_CHAIN, Objects.firstNonNull(inboundRequest.getHeader(NiFiHeaders.X_PROXIED_ENTITIES_CHAIN), "") + + String.format("<%s>", effectivePrincipalName.equalsIgnoreCase("anonymous") ? "" : effectivePrincipalName)); + + // Make sure headers named "Cookie" are removed from the request to NiFi, since NiFi does not use cookies. + Header[] cookieHeaders = outboundRequest.getHeaders("Cookie"); + for (Header cookieHeader : cookieHeaders) { + outboundRequest.removeHeader(cookieHeader); + } + return outboundRequest; + } +} http://git-wip-us.apache.org/repos/asf/knox/blob/89dd7788/gateway-service-nifi/src/main/java/org/apache/hadoop/gateway/dispatch/NiFiResponseUtil.java ---------------------------------------------------------------------- diff --git a/gateway-service-nifi/src/main/java/org/apache/hadoop/gateway/dispatch/NiFiResponseUtil.java b/gateway-service-nifi/src/main/java/org/apache/hadoop/gateway/dispatch/NiFiResponseUtil.java new file mode 100644 index 0000000..38c98b3 --- /dev/null +++ b/gateway-service-nifi/src/main/java/org/apache/hadoop/gateway/dispatch/NiFiResponseUtil.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.gateway.dispatch; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.List; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.commons.lang.StringUtils; +import org.apache.http.Header; +import org.apache.http.HttpResponse; +import org.apache.http.NameValuePair; +import org.apache.http.client.utils.URIBuilder; + +class NiFiResponseUtil { + + static void modifyOutboundResponse(HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, HttpResponse inboundResponse) throws IOException { + // Only want to rewrite the Location header on a HTTP 302 + if (inboundResponse.getStatusLine().getStatusCode() == HttpServletResponse.SC_FOUND) { + Header originalLocationHeader = inboundResponse.getFirstHeader("Location"); + if (originalLocationHeader != null) { + String originalLocation = originalLocationHeader.getValue(); + URIBuilder originalLocationUriBuilder; + try { + originalLocationUriBuilder = new URIBuilder(originalLocation); + } catch (URISyntaxException e) { + throw new RuntimeException("Unable to parse URI from Location header", e); + } + URIBuilder inboundRequestUriBuilder = null; + try { + inboundRequestUriBuilder = new URIBuilder(inboundRequest.getRequestURI()); + } catch (URISyntaxException e) { + throw new RuntimeException("Unable to parse the inbound request URI", e); + } + /* + * if the path specified in the Location header fron the inbound response contains the inbound request's URI's path, + * then it's going to the same web context, and the Location header should be updated based on the X_FORWARDED_* headers. + */ + String inboundRequestUriPath = inboundRequestUriBuilder.getPath(); + String originalLocationUriPath = originalLocationUriBuilder.getPath(); + if (originalLocationUriPath.contains(inboundRequestUriPath)) { + // check for trailing slash of Location header if it exists and preserve it + final String trailingSlash = originalLocationUriPath.endsWith("/") ? "/" : ""; + // retain query params + final List<NameValuePair> queryParams = originalLocationUriBuilder.getQueryParams(); + + // check for proxy settings + final String scheme = inboundRequest.getHeader(NiFiHeaders.X_FORWARDED_PROTO); + final String host = inboundRequest.getHeader(NiFiHeaders.X_FORWARDED_HOST); + final String port = inboundRequest.getHeader(NiFiHeaders.X_FORWARDED_PORT); + + final String baseContextPath = inboundRequest.getHeader(NiFiHeaders.X_FORWARDED_CONTEXT); + final String pathInfo = inboundRequest.getPathInfo(); + + try { + final URI newLocation = new URIBuilder().setScheme(scheme).setHost(host).setPort((StringUtils.isNumeric(port) ? Integer.parseInt(port) : -1)).setPath( + baseContextPath + pathInfo + trailingSlash).setParameters(queryParams).build(); + outboundResponse.setHeader("Location", newLocation.toString()); + } catch (URISyntaxException e) { + throw new RuntimeException("Unable to rewrite Location header in response", e); + } + } + } else { + throw new RuntimeException("Received HTTP 302, but response is missing Location header"); + } + } + } +} + http://git-wip-us.apache.org/repos/asf/knox/blob/89dd7788/gateway-spi/pom.xml ---------------------------------------------------------------------- diff --git a/gateway-spi/pom.xml b/gateway-spi/pom.xml index ade9329..ce7f504 100644 --- a/gateway-spi/pom.xml +++ b/gateway-spi/pom.xml @@ -151,6 +151,10 @@ <artifactId>velocity</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-util</artifactId> + </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/knox/blob/89dd7788/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/DefaultHttpClientFactory.java ---------------------------------------------------------------------- diff --git a/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/DefaultHttpClientFactory.java b/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/DefaultHttpClientFactory.java index 3c11468..706d436 100644 --- a/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/DefaultHttpClientFactory.java +++ b/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/DefaultHttpClientFactory.java @@ -17,9 +17,23 @@ */ package org.apache.hadoop.gateway.dispatch; +import java.io.IOException; +import java.security.KeyStore; +import java.security.Principal; +import java.util.Collections; +import java.util.Date; +import java.util.List; + +import javax.net.ssl.SSLContext; +import javax.servlet.FilterConfig; + import org.apache.hadoop.gateway.config.GatewayConfig; import org.apache.hadoop.gateway.services.GatewayServices; import org.apache.hadoop.gateway.services.metrics.MetricsService; +import org.apache.hadoop.gateway.services.security.AliasService; +import org.apache.hadoop.gateway.services.security.AliasServiceException; +import org.apache.hadoop.gateway.services.security.KeystoreService; +import org.apache.hadoop.gateway.services.security.MasterService; import org.apache.http.HttpRequest; import org.apache.http.HttpResponse; import org.apache.http.ProtocolException; @@ -36,6 +50,8 @@ import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.config.Registry; import org.apache.http.config.RegistryBuilder; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.conn.ssl.TrustSelfSignedStrategy; import org.apache.http.cookie.Cookie; import org.apache.http.impl.DefaultConnectionReuseStrategy; import org.apache.http.impl.client.BasicCredentialsProvider; @@ -43,31 +59,52 @@ import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; import org.apache.http.protocol.HttpContext; +import org.apache.http.ssl.SSLContexts; import org.joda.time.Period; import org.joda.time.format.PeriodFormatter; import org.joda.time.format.PeriodFormatterBuilder; -import javax.servlet.FilterConfig; -import java.io.IOException; -import java.security.Principal; -import java.util.Collections; -import java.util.Date; -import java.util.List; - public class DefaultHttpClientFactory implements HttpClientFactory { @Override public HttpClient createHttpClient(FilterConfig filterConfig) { HttpClientBuilder builder = null; GatewayConfig gatewayConfig = (GatewayConfig) filterConfig.getServletContext().getAttribute(GatewayConfig.GATEWAY_CONFIG_ATTRIBUTE); + GatewayServices services = (GatewayServices) filterConfig.getServletContext() + .getAttribute(GatewayServices.GATEWAY_SERVICES_ATTRIBUTE); if (gatewayConfig != null && gatewayConfig.isMetricsEnabled()) { - GatewayServices services = (GatewayServices) filterConfig.getServletContext() - .getAttribute(GatewayServices.GATEWAY_SERVICES_ATTRIBUTE); MetricsService metricsService = services.getService(GatewayServices.METRICS_SERVICE); builder = metricsService.getInstrumented(HttpClientBuilder.class); } else { builder = HttpClients.custom(); } + if (Boolean.parseBoolean(filterConfig.getInitParameter("useTwoWaySsl"))) { + char[] keypass = null; + MasterService ms = services.getService("MasterService"); + AliasService as = services.getService(GatewayServices.ALIAS_SERVICE); + try { + keypass = as.getGatewayIdentityPassphrase(); + } catch (AliasServiceException e) { + // nop - default passphrase will be used + } + if (keypass == null) { + // there has been no alias created for the key - let's assume it is the same as the keystore password + keypass = ms.getMasterSecret(); + } + + KeystoreService ks = services.getService(GatewayServices.KEYSTORE_SERVICE); + final SSLContext sslcontext; + try { + KeyStore keystoreForGateway = ks.getKeystoreForGateway(); + sslcontext = SSLContexts.custom() + .loadTrustMaterial(keystoreForGateway, new TrustSelfSignedStrategy()) + .loadKeyMaterial(keystoreForGateway, keypass) + .build(); + } catch (Exception e) { + throw new IllegalArgumentException("Unable to create SSLContext", e); + } + builder.setSSLSocketFactory(new SSLConnectionSocketFactory(sslcontext)); + } if ( "true".equals(System.getProperty(GatewayConfig.HADOOP_KERBEROS_SECURED)) ) { CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UseJaasCredentials()); http://git-wip-us.apache.org/repos/asf/knox/blob/89dd7788/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d97548b..69c8115 100644 --- a/pom.xml +++ b/pom.xml @@ -97,6 +97,7 @@ <module>gateway-shell-samples</module> <module>gateway-shell-release</module> <module>gateway-test-release-utils</module> + <module>gateway-service-nifi</module> </modules> <properties> @@ -668,6 +669,11 @@ </dependency> <dependency> <groupId>${gateway-group}</groupId> + <artifactId>gateway-service-nifi</artifactId> + <version>${gateway-version}</version> + </dependency> + <dependency> + <groupId>${gateway-group}</groupId> <artifactId>gateway-server</artifactId> <version>${gateway-version}</version> </dependency>