Repository: nifi Updated Branches: refs/heads/master 4f2643f66 -> fd5327e1b
NIFI-2059: Ensure that we properly pass along proxied entities in HTTP headers when secure and ensure that we don't keep creating new Root Group ID's once we've created one, even after restart. This closes #572. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fd5327e1 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fd5327e1 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fd5327e1 Branch: refs/heads/master Commit: fd5327e1b94e665a048a3127471c730a1f564da8 Parents: 4f2643f Author: Mark Payne <[email protected]> Authored: Thu Jun 23 14:25:11 2016 -0400 Committer: Matt Gilman <[email protected]> Committed: Thu Jun 23 16:32:25 2016 -0400 ---------------------------------------------------------------------- .../replication/ThreadPoolRequestReplicator.java | 15 +++++++++++++-- .../nifi/controller/StandardFlowService.java | 17 ++++++++--------- .../nifi/persistence/FlowConfigurationDAO.java | 5 +++++ .../StandardXMLFlowConfigurationDAO.java | 10 ++++++++-- 4 files changed, 34 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/fd5327e1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index ed83159..3d90b6f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -43,6 +43,8 @@ import javax.ws.rs.HttpMethod; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.http.HttpResponseMerger; import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger; @@ -57,9 +59,9 @@ import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.exception.UriConstructionException; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.events.EventReporter; -import org.apache.nifi.logging.NiFiLog; import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.web.security.ProxiedEntitiesUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,7 +74,7 @@ import com.sun.jersey.core.util.MultivaluedMapImpl; public class ThreadPoolRequestReplicator implements RequestReplicator { - private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(ThreadPoolRequestReplicator.class)); + private static final Logger logger = LoggerFactory.getLogger(ThreadPoolRequestReplicator.class); private static final int MAX_CONCURRENT_REQUESTS = 100; private final Client client; // the client to use for issuing requests @@ -210,6 +212,15 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { final Map<String, String> updatedHeaders = new HashMap<>(headers); updatedHeaders.put(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER, UUID.randomUUID().toString()); updatedHeaders.put(RequestReplicator.REPLICATION_INDICATOR_HEADER, "true"); + + // If the user is authenticated, add them as a proxied entity so that when the receiving NiFi receives the request, + // it knows that we are acting as a proxy on behalf of the current user. + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + if (user != null && !user.isAnonymous()) { + final String proxiedEntitiesChain = ProxiedEntitiesUtils.buildProxiedEntitiesChainString(user); + updatedHeaders.put(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, proxiedEntitiesChain); + } + return replicate(nodeIds, method, uri, entity, updatedHeaders, true, null); } http://git-wip-us.apache.org/repos/asf/nifi/blob/fd5327e1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 6fd9204..e2bdcf0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -558,25 +558,24 @@ public class StandardFlowService implements FlowService, ProtocolHandler { @Override public StandardDataFlow createDataFlow() throws IOException { - // Load the flow from disk - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - dao.load(baos); - final byte[] bytes = baos.toByteArray(); final byte[] snippetBytes = controller.getSnippetManager().export(); final byte[] authorizerFingerprint = getAuthorizerFingerprint(); - final StandardDataFlow fromDisk = new StandardDataFlow(bytes, snippetBytes, authorizerFingerprint); - // Check if the flow from disk is empty. If not, use it. - if (!StandardFlowSynchronizer.isEmpty(fromDisk, encryptor)) { + // Load the flow from disk if the file exists. + if (dao.isFlowPresent()) { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + dao.load(baos); + final byte[] bytes = baos.toByteArray(); + final StandardDataFlow fromDisk = new StandardDataFlow(bytes, snippetBytes, authorizerFingerprint); return fromDisk; } - // Flow from disk is empty, so serialize the Flow Controller and use that. + // Flow from disk does not exist, so serialize the Flow Controller and use that. // This is done because on startup, if there is no flow, the Flow Controller // will automatically create a Root Process Group, and we need to ensure that // we replicate that Process Group to all nodes in the cluster, so that they all // end up with the same ID for the root Process Group. - baos.reset(); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); dao.save(controller, baos); final byte[] flowBytes = baos.toByteArray(); baos.reset(); http://git-wip-us.apache.org/repos/asf/nifi/blob/fd5327e1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java index c5c1b3b..4593b2f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java @@ -33,6 +33,11 @@ import org.apache.nifi.controller.serialization.FlowSynchronizationException; public interface FlowConfigurationDAO { /** + * @return <code>true</code> if a file containing the flow is present, <code>false</code> otherwise + */ + boolean isFlowPresent(); + + /** * Loads the given controller with the values from the given proposed flow. If loading the proposed flow configuration would cause the controller to orphan flow files, then an * UninheritableFlowException is thrown. * http://git-wip-us.apache.org/repos/asf/nifi/blob/fd5327e1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java index 2d408e8..ffe212d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java @@ -67,6 +67,13 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD this.encryptor = encryptor; } + + @Override + public boolean isFlowPresent() { + final File flowXmlFile = flowXmlPath.toFile(); + return flowXmlFile.exists() && flowXmlFile.length() > 0; + } + @Override public synchronized void load(final FlowController controller, final DataFlow dataFlow) throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException { @@ -78,8 +85,7 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD @Override public synchronized void load(final OutputStream os) throws IOException { - final File file = flowXmlPath.toFile(); - if (!file.exists() || file.length() == 0) { + if (!isFlowPresent()) { return; }
