http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/node/Node.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/node/Node.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/node/Node.java new file mode 100644 index 0000000..84565da --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/node/Node.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.node; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.nifi.cluster.HeartbeatPayload; +import org.apache.nifi.cluster.protocol.Heartbeat; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.ProtocolException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Represents a connected flow controller. Nodes always have an immutable + * identifier and a status. The status may be changed, but never null. + * + * A Node may be cloned, but the cloning is a shallow copy of the instance. + * + * This class overrides hashCode and equals and considers two instances to be + * equal if they have the equal NodeIdentifiers. + * + * @author unattributed + */ +public class Node implements Cloneable, Comparable<Node> { + + private static final Logger lockLogger = LoggerFactory.getLogger("cluster.lock"); + + /** + * The semantics of a Node status are as follows: + * <ul> + * <li>CONNECTED -- a flow controller that is connected to the cluster. A + * connecting node transitions to connected after the cluster receives the + * flow controller's first heartbeat. A connected node can transition to + * disconnecting.</li> + * <li>CONNECTING -- a flow controller has issued a connection request to + * the cluster, but has not yet sent a heartbeat. A connecting node can + * transition to disconnecting or connected. The cluster will not accept any + * external requests to change the flow while any node is connecting.</li> + * <li>DISCONNECTED -- a flow controller that is not connected to the + * cluster. A disconnected node can transition to connecting.</li> + * <li>DISCONNECTING -- a flow controller that is in the process of + * disconnecting from the cluster. A disconnecting node will always + * transition to disconnected.</li> + * </ul> + */ + public static enum Status { + + CONNECTED, + CONNECTING, + DISCONNECTED, + DISCONNECTING + } + + /** + * the node's unique identifier + */ + private final NodeIdentifier nodeId; + + /** + * the node statue + */ + private Status status; + + /** + * the last heartbeat received by from the node + */ + private Heartbeat lastHeartbeat; + + /** + * the payload of the last heartbeat received from the node + */ + private HeartbeatPayload lastHeartbeatPayload; + + /** + * the last time the connection for this node was requested + */ + private AtomicLong connectionRequestedTimestamp = new AtomicLong(0L); + + /** + * a flag to indicate this node was disconnected because of a lack of + * heartbeat + */ + private boolean heartbeatDisconnection; + + public Node(final NodeIdentifier id, final Status status) { + if (id == null) { + throw new IllegalArgumentException("ID may not be null."); + } else if (status == null) { + throw new IllegalArgumentException("Status may not be null."); + } + this.nodeId = id; + this.status = status; + } + + public NodeIdentifier getNodeId() { + return nodeId; + } + + /** + * Returns the last received heartbeat or null if no heartbeat has been set. + * + * @return a heartbeat or null + */ + public Heartbeat getHeartbeat() { + return lastHeartbeat; + } + + public HeartbeatPayload getHeartbeatPayload() { + return lastHeartbeatPayload; + } + + /** + * Sets the last heartbeat received. + * + * @param heartbeat a heartbeat + * + * @throws ProtocolException if the heartbeat's payload failed unmarshalling + */ + public void setHeartbeat(final Heartbeat heartbeat) throws ProtocolException { + this.lastHeartbeat = heartbeat; + if (this.lastHeartbeat == null) { + this.lastHeartbeatPayload = null; + } else { + final byte[] payload = lastHeartbeat.getPayload(); + if (payload == null || payload.length == 0) { + this.lastHeartbeatPayload = null; + } else { + this.lastHeartbeatPayload = HeartbeatPayload.unmarshal(payload); + } + } + } + + /** + * Returns the time of the last received connection request for this node. + * + * @return the time when the connection request for this node was received. + */ + public long getConnectionRequestedTimestamp() { + return connectionRequestedTimestamp.get(); + } + + /** + * Sets the time when the connection request for this node was last + * received. + * + * This method is thread-safe and may be called without obtaining any lock. + * + * @param connectionRequestedTimestamp + */ + public void setConnectionRequestedTimestamp(long connectionRequestedTimestamp) { + this.connectionRequestedTimestamp.set(connectionRequestedTimestamp); + } + + /** + * Returns true if the node was disconnected due to lack of heartbeat; false + * otherwise. + * + * @return true if the node was disconnected due to lack of heartbeat; false + * otherwise. + */ + public boolean isHeartbeatDisconnection() { + return heartbeatDisconnection; + } + + /** + * Sets the status to disconnected and flags the node as being disconnected + * by lack of heartbeat. + */ + public void setHeartbeatDisconnection() { + setStatus(Status.DISCONNECTED); + heartbeatDisconnection = true; + } + + /** + * @return the status + */ + public Status getStatus() { + return status; + } + + /** + * @param status a status + */ + public void setStatus(final Status status) { + if (status == null) { + throw new IllegalArgumentException("Status may not be null."); + } + this.status = status; + heartbeatDisconnection = false; + } + + @Override + public Node clone() { + final Node clone = new Node(nodeId, status); + clone.lastHeartbeat = lastHeartbeat; + clone.lastHeartbeatPayload = lastHeartbeatPayload; + clone.heartbeatDisconnection = heartbeatDisconnection; + clone.connectionRequestedTimestamp = connectionRequestedTimestamp; + return clone; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final Node other = (Node) obj; + if (this.nodeId != other.nodeId && (this.nodeId == null || !this.nodeId.equals(other.nodeId))) { + return false; + } + return true; + } + + @Override + public int hashCode() { + int hash = 7; + hash = 53 * hash + (this.nodeId != null ? this.nodeId.hashCode() : 0); + return hash; + } + + @Override + public String toString() { + return nodeId.toString(); + } + + @Override + public int compareTo(final Node o) { + if (o == null) { + return -1; + } + return getNodeId().getId().compareTo(o.getNodeId().getId()); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java new file mode 100644 index 0000000..e26d196 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.spring; + +import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.cluster.protocol.impl.ClusterServiceDiscovery; +import org.apache.nifi.cluster.protocol.impl.ClusterServiceLocator; +import org.apache.nifi.io.socket.multicast.DiscoverableService; +import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; + +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.FactoryBean; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; + +/** + * Factory bean for creating a singleton ClusterManagerProtocolServiceLocator + * instance. If the application is configured to act as the cluster manager, + * then null is always returned as the created instance. + * + * The cluster manager protocol service represents the socket endpoint for + * sending internal socket messages to the cluster manager. + */ +public class ClusterManagerProtocolServiceLocatorFactoryBean implements FactoryBean, ApplicationContextAware, DisposableBean { + + private ApplicationContext applicationContext; + + private ClusterServiceLocator locator; + + private NiFiProperties properties; + + @Override + public Object getObject() throws Exception { + /* + * If configured for the cluster manager, then the service locator is never used. + */ + if (properties.isClusterManager()) { + return null; + } else if (locator == null) { + + if (properties.getClusterProtocolUseMulticast()) { + + // get the service discovery instance + final ClusterServiceDiscovery serviceDiscovery = applicationContext.getBean("clusterManagerProtocolServiceDiscovery", ClusterServiceDiscovery.class); + + // create service location configuration + final ClusterServiceLocator.AttemptsConfig config = new ClusterServiceLocator.AttemptsConfig(); + config.setNumAttempts(properties.getClusterProtocolMulticastServiceLocatorAttempts()); + + final int delay = (int) FormatUtils.getTimeDuration(properties.getClusterProtocolMulticastServiceLocatorAttemptsDelay(), TimeUnit.SECONDS); + config.setTimeBetweenAttempts(delay); + config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS); + + locator = new ClusterServiceLocator(serviceDiscovery); + locator.setAttemptsConfig(config); + + } else { + final String serviceName = applicationContext.getBean("clusterManagerProtocolServiceName", String.class); + final InetSocketAddress serviceAddress = properties.getClusterNodeUnicastManagerProtocolAddress(); + final DiscoverableService service = new DiscoverableServiceImpl(serviceName, serviceAddress); + locator = new ClusterServiceLocator(service); + } + + // start the locator + locator.start(); + + } + return locator; + + } + + @Override + public Class getObjectType() { + return ClusterServiceLocator.class; + } + + @Override + public boolean isSingleton() { + return true; + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + + @Override + public void destroy() throws Exception { + if (locator != null && locator.isRunning()) { + locator.stop(); + } + } + + public void setProperties(NiFiProperties properties) { + this.properties = properties; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java new file mode 100644 index 0000000..ef72298 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.spring; + +import java.io.File; +import org.apache.nifi.cluster.firewall.impl.FileBasedClusterNodeFirewall; +import org.apache.nifi.util.NiFiProperties; +import org.springframework.beans.factory.FactoryBean; + +/** + * Factory bean for creating a singleton FileBasedClusterNodeFirewall instance. + */ +public class FileBasedClusterNodeFirewallFactoryBean implements FactoryBean { + + private FileBasedClusterNodeFirewall firewall; + + private NiFiProperties properties; + + @Override + public Object getObject() throws Exception { + if (firewall == null) { + final File config = properties.getClusterManagerNodeFirewallFile(); + final File restoreDirectory = properties.getRestoreDirectory(); + if (config != null) { + firewall = new FileBasedClusterNodeFirewall(config, restoreDirectory); + } + } + return firewall; + } + + @Override + public Class getObjectType() { + return FileBasedClusterNodeFirewall.class; + } + + @Override + public boolean isSingleton() { + return true; + } + + public void setProperties(NiFiProperties properties) { + this.properties = properties; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java new file mode 100644 index 0000000..7169730 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.spring; + +import java.nio.file.Paths; +import org.apache.nifi.admin.service.AuditService; +import org.apache.nifi.cluster.event.EventManager; +import org.apache.nifi.cluster.firewall.ClusterNodeFirewall; +import org.apache.nifi.cluster.flow.DataFlowManagementService; +import org.apache.nifi.cluster.manager.HttpRequestReplicator; +import org.apache.nifi.cluster.manager.HttpResponseMapper; +import org.apache.nifi.cluster.manager.impl.WebClusterManager; +import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderListener; +import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster; +import org.apache.nifi.controller.service.ControllerServiceLoader; +import org.apache.nifi.encrypt.StringEncryptor; +import org.apache.nifi.io.socket.multicast.DiscoverableService; +import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; +import org.apache.nifi.util.NiFiProperties; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.FactoryBean; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; + +/** + * Factory bean for creating a singleton WebClusterManager instance. If the + * application is not configured to act as the cluster manager, then null is + * always returned as the created instance. + */ +public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationContextAware { + + private ApplicationContext applicationContext; + + private WebClusterManager clusterManager; + + private NiFiProperties properties; + + private StringEncryptor encryptor; + + @Override + public Object getObject() throws Exception { + if (properties.isClusterManager() && properties.isNode()) { + throw new IllegalStateException("Application may be configured as a cluster manager or a node, but not both."); + } else if (!properties.isClusterManager()) { + /* + * If not configured for the cluster manager, then the cluster manager is never used. + * null is returned so that we don't instantiate a thread pool or other resources. + */ + return null; + } else if (clusterManager == null) { + + // get the service configuration path (fail early) + final String serviceConfigurationFile = properties.getProperty(NiFiProperties.SERVICE_CONFIGURATION_FILE); + if (serviceConfigurationFile == null) { + throw new NullPointerException("The service configuration file has not been specified."); + } + + final HttpRequestReplicator requestReplicator = applicationContext.getBean("httpRequestReplicator", HttpRequestReplicator.class); + final HttpResponseMapper responseMapper = applicationContext.getBean("httpResponseMapper", HttpResponseMapper.class); + final DataFlowManagementService dataFlowService = applicationContext.getBean("dataFlowManagementService", DataFlowManagementService.class); + final ClusterManagerProtocolSenderListener senderListener = applicationContext.getBean("clusterManagerProtocolSenderListener", ClusterManagerProtocolSenderListener.class); + + // create the manager + clusterManager = new WebClusterManager( + requestReplicator, + responseMapper, + dataFlowService, + senderListener, + properties, + encryptor + ); + + // set the service broadcaster + if (properties.getClusterProtocolUseMulticast()) { + + // create broadcaster + final ClusterServicesBroadcaster broadcaster = applicationContext.getBean("clusterServicesBroadcaster", ClusterServicesBroadcaster.class); + + // register the cluster manager protocol service + final String clusterManagerProtocolServiceName = applicationContext.getBean("clusterManagerProtocolServiceName", String.class); + final DiscoverableService clusterManagerProtocolService = new DiscoverableServiceImpl(clusterManagerProtocolServiceName, properties.getClusterManagerProtocolAddress()); + broadcaster.addService(clusterManagerProtocolService); + + clusterManager.setServicesBroadcaster(broadcaster); + } + + // set the event manager + clusterManager.setEventManager(applicationContext.getBean("nodeEventHistoryManager", EventManager.class)); + + // set the cluster firewall + clusterManager.setClusterFirewall(applicationContext.getBean("clusterFirewall", ClusterNodeFirewall.class)); + + // set the audit service + clusterManager.setAuditService(applicationContext.getBean("auditService", AuditService.class)); + + // load the controller services + final ControllerServiceLoader serviceLoader = new ControllerServiceLoader(Paths.get(serviceConfigurationFile)); + serviceLoader.loadControllerServices(clusterManager); + } + return clusterManager; + } + + @Override + public Class getObjectType() { + return WebClusterManager.class; + } + + @Override + public boolean isSingleton() { + return true; + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + + public void setProperties(NiFiProperties properties) { + this.properties = properties; + } + + public void setEncryptor(final StringEncryptor encryptor) { + this.encryptor = encryptor; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java new file mode 100644 index 0000000..1ed5b30 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.reporting; + +import org.apache.nifi.cluster.manager.impl.ClusteredReportingContext; +import org.apache.nifi.controller.ProcessScheduler; +import org.apache.nifi.controller.ValidationContextFactory; +import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.reporting.EventAccess; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.ReportingTask; + +public class ClusteredReportingTaskNode extends AbstractReportingTaskNode { + + private final EventAccess eventAccess; + private final BulletinRepository bulletinRepository; + private final ControllerServiceProvider serviceProvider; + + public ClusteredReportingTaskNode(final ReportingTask reportingTask, final String id, final ProcessScheduler scheduler, + final EventAccess eventAccess, final BulletinRepository bulletinRepository, final ControllerServiceProvider serviceProvider, + final ValidationContextFactory validationContextFactory) { + super(reportingTask, id, serviceProvider, scheduler, validationContextFactory); + + this.eventAccess = eventAccess; + this.bulletinRepository = bulletinRepository; + this.serviceProvider = serviceProvider; + } + + @Override + public ReportingContext getReportingContext() { + return new ClusteredReportingContext(eventAccess, bulletinRepository, getProperties(), serviceProvider); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/resources/nifi-cluster-manager-context.xml ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/resources/nifi-cluster-manager-context.xml b/nar-bundles/framework-bundle/framework/cluster/src/main/resources/nifi-cluster-manager-context.xml new file mode 100644 index 0000000..68c29bc --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/resources/nifi-cluster-manager-context.xml @@ -0,0 +1,124 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<!-- marked as lazy so that clustering beans are not created when applications runs in non-clustered mode --> +<beans default-lazy-init="true" + xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns:util="http://www.springframework.org/schema/util" + xmlns:context="http://www.springframework.org/schema/context" + xmlns:aop="http://www.springframework.org/schema/aop" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd + http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd + http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd + http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd"> + + <!-- jersey client --> + <bean id="jersey-client" class="org.apache.nifi.web.util.WebUtils" factory-method="createClient"> + <constructor-arg> + <bean class="com.sun.jersey.api.client.config.DefaultClientConfig"/> + </constructor-arg> + <constructor-arg> + <bean class="org.apache.nifi.framework.security.util.SslContextFactory" factory-method="createSslContext"> + <constructor-arg ref="nifiProperties"/> + </bean> + </constructor-arg> + </bean> + + <!-- http request replicator --> + <bean id="httpRequestReplicator" class="org.apache.nifi.cluster.manager.impl.HttpRequestReplicatorImpl"> + <constructor-arg index="0"> + <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeApiRequestThreads"/> + </constructor-arg> + <constructor-arg ref="jersey-client" index="1"/> + <constructor-arg index="2"> + <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeApiConnectionTimeout"/> + </constructor-arg> + <constructor-arg index="3"> + <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeApiReadTimeout"/> + </constructor-arg> + <property name="nodeProtocolScheme"> + <bean factory-bean="nifiProperties" factory-method="getClusterProtocolManagerToNodeApiScheme"/> + </property> + </bean> + + <!-- http response mapper --> + <bean id="httpResponseMapper" class="org.apache.nifi.cluster.manager.impl.HttpResponseMapperImpl"/> + + <!-- cluster flow DAO --> + <bean id="dataFlowDao" class="org.apache.nifi.cluster.flow.impl.DataFlowDaoImpl"> + <constructor-arg index="0"> + <bean factory-bean="nifiProperties" factory-method="getFlowConfigurationFileDir"/> + </constructor-arg> + <constructor-arg index="1"> + <bean factory-bean="nifiProperties" factory-method="getRestoreDirectory"/> + </constructor-arg> + <constructor-arg index="2"> + <bean factory-bean="nifiProperties" factory-method="getAutoResumeState"/> + </constructor-arg> + </bean> + + <!-- dataflow management service --> + <bean id="dataFlowManagementService" class="org.apache.nifi.cluster.flow.impl.DataFlowManagementServiceImpl"> + <constructor-arg ref="dataFlowDao"/> + <constructor-arg ref="clusterManagerProtocolSender"/> + <property name="retrievalDelay"> + <bean factory-bean="nifiProperties" factory-method="getClusterManagerFlowRetrievalDelay"/> + </property> + </bean> + + <!-- node event history manager --> + <bean id="nodeEventHistoryManager" class="org.apache.nifi.cluster.event.impl.EventManagerImpl"> + <constructor-arg> + <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeEventHistorySize"/> + </constructor-arg> + </bean> + + <!-- cluster firewall --> + <bean id="clusterFirewall" class="org.apache.nifi.cluster.spring.FileBasedClusterNodeFirewallFactoryBean"> + <property name="properties" ref="nifiProperties"/> + </bean> + + <!-- cluster manager --> + <bean id="clusterManager" class="org.apache.nifi.cluster.spring.WebClusterManagerFactoryBean"> + <property name="properties" ref="nifiProperties"/> + <property name="encryptor" ref="stringEncryptor"/> + </bean> + + <!-- discoverable services --> + + <!-- cluster manager protocol discoverable service --> + + <!-- service name for communicating with the cluster manager using sockets --> + <bean id="clusterManagerProtocolServiceName" class="java.lang.String"> + <constructor-arg value="cluster-manager-protocol" /> + </bean> + + <!-- cluster manager protocol service discovery --> + <bean id="clusterManagerProtocolServiceDiscovery" class="org.apache.nifi.cluster.protocol.impl.ClusterServiceDiscovery"> + <constructor-arg ref="clusterManagerProtocolServiceName" index="0"/> + <constructor-arg index="1"> + <bean factory-bean="nifiProperties" factory-method="getClusterProtocolMulticastAddress"/> + </constructor-arg> + <constructor-arg ref="protocolMulticastConfiguration" index="2"/> + <constructor-arg ref="protocolContext" index="3"/> + </bean> + + <!-- cluster manager protocol service locator --> + <bean id="clusterManagerProtocolServiceLocator" class="org.apache.nifi.cluster.spring.ClusterManagerProtocolServiceLocatorFactoryBean"> + <property name="properties" ref="nifiProperties"/> + </bean> + +</beans> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java new file mode 100644 index 0000000..09ea44b --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.event.impl; + +import org.apache.nifi.cluster.event.impl.EventManagerImpl; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.nifi.cluster.event.Event; +import org.apache.nifi.cluster.event.Event.Category; +import org.apache.nifi.cluster.event.EventManager; +import org.junit.Test; +import static org.junit.Assert.*; + +/** + * @author unattributed + */ +public class EventManagerImplTest { + + @Test(expected = IllegalArgumentException.class) + public void testNonPositiveHistorySize() { + new EventManagerImpl(0); + } + + @Test + public void testGetEventsUnknownSource() { + EventManager manager = new EventManagerImpl(1); + assertEquals(Collections.EMPTY_LIST, manager.getEvents("unknown value")); + } + + @Test + public void testGetEvents() { + + EventManager manager = new EventManagerImpl(2); + + Event e1 = new Event("1", "Event1", Category.INFO, 0); + Event e2 = new Event("1", "Event2", Category.INFO, 1); + + manager.addEvent(e1); + manager.addEvent(e2); + + List<Event> events = manager.getEvents("1"); + + // assert newest to oldest + assertEquals(Arrays.asList(e2, e1), events); + } + + @Test + public void testGetMostRecentEventUnknownSource() { + EventManager manager = new EventManagerImpl(1); + assertNull(manager.getMostRecentEvent("unknown value")); + } + + @Test + public void testGetMostRecentEvent() { + + EventManager manager = new EventManagerImpl(2); + + Event e1 = new Event("1", "Event1", Category.INFO, 0); + Event e2 = new Event("1", "Event2", Category.INFO, 1); + + manager.addEvent(e1); + manager.addEvent(e2); + + // assert newest to oldest + assertEquals(e2, manager.getMostRecentEvent("1")); + } + + @Test + public void testAddEventExceedsHistorySize() { + + EventManager manager = new EventManagerImpl(1); + + Event e1 = new Event("1", "Event1", Category.INFO, 0); + Event e2 = new Event("1", "Event2", Category.INFO, 1); + + manager.addEvent(e1); + manager.addEvent(e2); + + List<Event> events = manager.getEvents("1"); + + // assert oldest evicted + assertEquals(Arrays.asList(e2), events); + + } + + @Test + public void testClearHistory() { + + EventManager manager = new EventManagerImpl(1); + + Event e1 = new Event("1", "Event1", Category.INFO, 0); + Event e2 = new Event("1", "Event2", Category.INFO, 1); + + manager.addEvent(e1); + manager.addEvent(e2); + + manager.clearEventHistory("1"); + + // assert oldest evicted + assertTrue(manager.getEvents("1").isEmpty()); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java new file mode 100644 index 0000000..2fcf7ef --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.firewall.impl; + +import org.apache.nifi.cluster.firewall.impl.FileBasedClusterNodeFirewall; +import java.io.File; +import java.io.IOException; +import org.apache.nifi.file.FileUtils; +import org.junit.After; +import org.junit.Before; +import static org.junit.Assert.*; +import org.junit.Test; + +public class FileBasedClusterNodeFirewallTest { + + private FileBasedClusterNodeFirewall ipsFirewall; + + private FileBasedClusterNodeFirewall acceptAllFirewall; + + private File ipsConfig; + + private File emptyConfig; + + private File restoreDirectory; + + @Before + public void setup() throws Exception { + + ipsConfig = new File("src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt"); + emptyConfig = new File("src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt"); + + restoreDirectory = new File(System.getProperty("java.io.tmpdir") + "/firewall_restore"); + + ipsFirewall = new FileBasedClusterNodeFirewall(ipsConfig, restoreDirectory); + acceptAllFirewall = new FileBasedClusterNodeFirewall(emptyConfig); + } + + @After + public void teardown() throws IOException { + deleteFile(restoreDirectory); + } + + @Test + public void testSyncWithRestore() { + assertEquals(ipsConfig.length(), new File(restoreDirectory, ipsConfig.getName()).length()); + } + + @Test + public void testIsPermissibleWithExactMatch() { + assertTrue(ipsFirewall.isPermissible("2.2.2.2")); + } + + @Test + public void testIsPermissibleWithSubnetMatch() { + assertTrue(ipsFirewall.isPermissible("3.3.3.255")); + } + + @Test + public void testIsPermissibleWithNoMatch() { + assertFalse(ipsFirewall.isPermissible("255.255.255.255")); + } + + @Test + public void testIsPermissibleWithMalformedData() { + assertFalse(ipsFirewall.isPermissible("abc")); + } + + @Test + public void testIsPermissibleWithEmptyConfig() { + assertTrue(acceptAllFirewall.isPermissible("1.1.1.1")); + } + + @Test + public void testIsPermissibleWithEmptyConfigWithMalformedData() { + assertTrue(acceptAllFirewall.isPermissible("abc")); + } + + private boolean deleteFile(final File file) { + if (file.isDirectory()) { + FileUtils.deleteFilesInDir(file, null, null, true, true); + } + return FileUtils.deleteFile(file, null, 10); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java new file mode 100644 index 0000000..6294dfc --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.flow.impl; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; + +import org.apache.nifi.cluster.flow.DataFlowDao; +import org.apache.nifi.cluster.flow.PersistedFlowState; +import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.ProtocolHandler; +import org.apache.nifi.cluster.protocol.StandardDataFlow; +import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderImpl; +import org.apache.nifi.cluster.protocol.impl.SocketProtocolListener; +import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; +import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; +import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import org.apache.nifi.io.socket.ServerSocketConfiguration; +import org.apache.nifi.io.socket.SocketConfiguration; + +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.xml.sax.SAXException; + +/** + * @author unattributed + */ +public class DataFlowManagementServiceImplTest { + + private DataFlowManagementServiceImpl service; + private File restoreLocation; + private File primaryLocation; + private DataFlowDao dao; + private int apiDummyPort; + private int socketPort; + private SocketConfiguration socketConfig; + private ClusterManagerProtocolSender sender; + private ServerSocketConfiguration serverSocketConfig; + private SocketProtocolListener listener; + + @Before + public void setup() throws IOException { + + primaryLocation = new File(System.getProperty("java.io.tmpdir") + "/primary"); + restoreLocation = new File(System.getProperty("java.io.tmpdir") + "/restore"); + + FileUtils.deleteDirectory(primaryLocation); + FileUtils.deleteDirectory(restoreLocation); + + ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); + + socketConfig = new SocketConfiguration(); + socketConfig.setSocketTimeout(1000); + serverSocketConfig = new ServerSocketConfiguration(); + + dao = new DataFlowDaoImpl(primaryLocation, restoreLocation, false); + + sender = new ClusterManagerProtocolSenderImpl(socketConfig, protocolContext); + + service = new DataFlowManagementServiceImpl(dao, sender); + service.start(); + + listener = new SocketProtocolListener(1, 0, serverSocketConfig, protocolContext); + listener.start(); + + apiDummyPort = 7777; + socketPort = listener.getPort(); + } + + @After + public void teardown() throws IOException { + + if (service != null && service.isRunning()) { + service.stop(); + } + + if (listener != null && listener.isRunning()) { + try { + listener.stop(); + } catch (final Exception ex) { + ex.printStackTrace(System.out); + } + } + + } + + @Test + public void testLoadFlowWithNonExistentFlow() throws ParserConfigurationException, SAXException, IOException { + verifyFlow(); + } + + @Test + public void testLoadFlowWithNonExistentFlowWhenServiceStopped() throws IOException, SAXException, ParserConfigurationException { + service.stop(); + verifyFlow(); + } + + private void verifyFlow() throws ParserConfigurationException, SAXException, IOException { + final byte[] flowBytes = service.loadDataFlow().getDataFlow().getFlow(); + final DocumentBuilder docBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder(); + final Document doc = docBuilder.parse(new ByteArrayInputStream(flowBytes)); + final Element controller = (Element) doc.getElementsByTagName("flowController").item(0); + final Element rootGroup = (Element) controller.getElementsByTagName("rootGroup").item(0); + final String rootGroupName = rootGroup.getElementsByTagName("name").item(0).getTextContent(); + assertEquals("NiFi Flow", rootGroupName); + } + + @Test + public void testLoadFlowSingleNode() throws Exception { + String flowStr = "<rootGroup />"; + byte[] flowBytes = flowStr.getBytes(); + listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); + + NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort); + service.setNodeIds(new HashSet<>(Arrays.asList(nodeId))); + service.setPersistedFlowState(PersistedFlowState.STALE); + + assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState()); + + // sleep long enough for the flow retriever to run + waitForState(PersistedFlowState.CURRENT); + + assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState()); + assertArrayEquals(flowBytes, service.loadDataFlow().getDataFlow().getFlow()); + + } + + @Test + public void testLoadFlowWithSameNodeIds() throws Exception { + + String flowStr = "<rootGroup />"; + listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); + + NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort); + NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort); + service.setNodeIds(new HashSet<>(Arrays.asList(nodeId1, nodeId2))); + service.setPersistedFlowState(PersistedFlowState.STALE); + + assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState()); + + // sleep long enough for the flow retriever to run + waitForState(PersistedFlowState.CURRENT); + + // verify that flow is current + assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState()); + + // add same ids in different order + service.setNodeIds(new HashSet<>(Arrays.asList(nodeId2, nodeId1))); + + // verify flow is still current + assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState()); + + } + + @Test + public void testLoadFlowWithABadNode() throws Exception { + + String flowStr = "<rootGroup />"; + byte[] flowBytes = flowStr.getBytes(); + listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); + + NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1); + NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort); + service.setNodeIds(new HashSet<>(Arrays.asList(nodeId1, nodeId2))); + service.setPersistedFlowState(PersistedFlowState.STALE); + + assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState()); + + // sleep long enough for the flow retriever to run + waitForState(PersistedFlowState.CURRENT); + + assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState()); + assertArrayEquals(flowBytes, service.loadDataFlow().getDataFlow().getFlow()); + + } + + @Test + public void testLoadFlowWithConstantNodeIdChanging() throws Exception { + String flowStr = "<rootGroup />"; + byte[] flowBytes = flowStr.getBytes(); + listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); + + NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1); + NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort); + + for (int i = 0; i < 1000; i++) { + service.setNodeIds(new HashSet<>(Arrays.asList(nodeId1, nodeId2))); + service.setPersistedFlowState(PersistedFlowState.STALE); + assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState()); + } + + // sleep long enough for the flow retriever to run + waitForState(PersistedFlowState.CURRENT); + + assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState()); + assertArrayEquals(flowBytes, service.loadDataFlow().getDataFlow().getFlow()); + } + + @Test + public void testLoadFlowWithConstantNodeIdChangingWithRetrievalDelay() throws Exception { + + String flowStr = "<rootGroup />"; + listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); + + NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1); + NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort); + + service.setRetrievalDelay("5 sec"); + for (int i = 0; i < 1000; i++) { + service.setNodeIds(new HashSet<>(Arrays.asList(nodeId1, nodeId2))); + service.setPersistedFlowState(PersistedFlowState.STALE); + assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState()); + } + + // sleep long enough for the flow retriever to run + waitForState(PersistedFlowState.STALE); + + assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState()); + + } + + @Test + public void testStopRequestedWhileRetrieving() throws Exception { + + String flowStr = "<rootGroup />"; + listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); + Set<NodeIdentifier> nodeIds = new HashSet<>(); + for (int i = 0; i < 1000; i++) { + nodeIds.add(new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1)); + } + nodeIds.add(new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort)); + + long lastRetrievalTime = service.getLastRetrievalTime(); + + service.setNodeIds(nodeIds); + service.setPersistedFlowState(PersistedFlowState.STALE); + assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState()); + + // sleep long enough for the flow retriever to run + waitForState(PersistedFlowState.STALE); + + service.stop(); + + service.setPersistedFlowState(PersistedFlowState.STALE); + assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState()); + + assertEquals(lastRetrievalTime, service.getLastRetrievalTime()); + + } + + @Test + public void testLoadFlowUnknownState() throws Exception { + + String flowStr = "<rootGroup />"; + byte[] flowBytes = flowStr.getBytes(); + listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); + NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort); + + service.setNodeIds(new HashSet<>(Arrays.asList(nodeId))); + service.setPersistedFlowState(PersistedFlowState.STALE); + assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState()); + + service.setPersistedFlowState(PersistedFlowState.UNKNOWN); + + assertEquals(PersistedFlowState.UNKNOWN, service.getPersistedFlowState()); + + service.setPersistedFlowState(PersistedFlowState.STALE); + assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState()); + + // sleep long enough for the flow retriever to run + waitForState(PersistedFlowState.CURRENT); + + assertArrayEquals(flowBytes, service.loadDataFlow().getDataFlow().getFlow()); + + } + + private class FlowRequestProtocolHandler implements ProtocolHandler { + + private StandardDataFlow dataFlow; + + public FlowRequestProtocolHandler(final StandardDataFlow dataFlow) { + this.dataFlow = dataFlow; + } + + @Override + public boolean canHandle(ProtocolMessage msg) { + return true; + } + + @Override + public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { + FlowResponseMessage response = new FlowResponseMessage(); + response.setDataFlow(dataFlow); + return response; + } + + } + + private void waitForState(PersistedFlowState state) throws InterruptedException { + for (int i = 0; i < 30; i++) { + if (service.getPersistedFlowState() == state) { + break; + } else { + Thread.sleep(1000); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java new file mode 100644 index 0000000..0c65aba --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.impl; + +import org.apache.nifi.cluster.manager.impl.HttpRequestReplicatorImpl; +import javax.ws.rs.core.Response; +import javax.xml.bind.annotation.XmlRootElement; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.core.MultivaluedMap; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.Iterator; +import javax.ws.rs.core.StreamingOutput; +import org.apache.nifi.cluster.manager.testutils.HttpResponse; +import org.apache.nifi.cluster.manager.testutils.HttpServer; +import com.sun.jersey.api.client.Client; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.URI; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.ws.rs.core.Response.Status; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.apache.nifi.cluster.manager.testutils.HttpResponseAction; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import static org.junit.Assert.*; + +/** + * @author unattributed + */ +public class HttpRequestReplicatorImplTest { + + private Client client; + private HttpRequestReplicatorImpl replicator; + private int executorThreadCount; + private int serverThreadCount; + private int serverPort; + private HttpServer server; + private Map<String, List<String>> expectedRequestParameters; + private Map<String, String> expectedRequestHeaders; + private Map<String, String> expectedResponseHeaders; + private Object expectedEntity; + private String expectedBody; + private URI prototypeUri; + + @Before + public void setUp() throws IOException, URISyntaxException { + + executorThreadCount = 5; + serverThreadCount = 3; + + client = Client.create(); + + replicator = new HttpRequestReplicatorImpl(executorThreadCount, client, "1 sec", "1 sec"); + replicator.start(); + + expectedRequestHeaders = new HashMap<>(); + expectedRequestHeaders.put("header1", "header value1"); + expectedRequestHeaders.put("header2", "header value2"); + + expectedRequestParameters = new HashMap<>(); + expectedRequestParameters.put("param1", Arrays.asList("p value1")); + expectedRequestParameters.put("param2", Arrays.asList("p value2")); + + expectedResponseHeaders = new HashMap<>(); + expectedResponseHeaders.put("header1", "header value1"); + expectedResponseHeaders.put("header2", "header value2"); + + expectedEntity = new Entity(); + + expectedBody = "some text"; + + prototypeUri = new URI("http://prototype.host/path/to/resource"); + + server = new HttpServer(serverThreadCount, 0); + server.start(); + serverPort = server.getPort(); + } + + @After + public void teardown() { + if (server.isRunning()) { + server.stop(); + } + if (replicator.isRunning()) { + replicator.stop(); + } + } + + @Test + public void testReplicateGetLessNodesThanReplicatorThreads() throws Throwable { + testReplicateXXX(executorThreadCount - 1, HttpMethod.GET); + } + + @Test + public void testReplicateGetMoreNodesThanReplicatorThreads() throws Throwable { + testReplicateXXX(executorThreadCount + 1, HttpMethod.GET); + } + + @Test + public void testReplicateGetWithUnresponsiveNode() throws Throwable { + + // nodes + Set<NodeIdentifier> nodeIds = createNodes(2, "localhost", serverPort); + + // response + HttpResponse expectedResponse = new HttpResponse(Status.OK, expectedBody); + + // first response normal, second response slow + server.addResponseAction(new HttpResponseAction(expectedResponse)); + server.addResponseAction(new HttpResponseAction(expectedResponse, 3500)); + + Set<NodeResponse> responses = replicator.replicate( + nodeIds, + HttpMethod.GET, + prototypeUri, + expectedRequestParameters, + expectedRequestHeaders); + + assertEquals(nodeIds.size(), responses.size()); + + Iterator<NodeResponse> nodeResponseItr = responses.iterator(); + + NodeResponse firstResponse = nodeResponseItr.next(); + NodeResponse secondResponse = nodeResponseItr.next(); + NodeResponse goodResponse; + NodeResponse badResponse; + if (firstResponse.hasThrowable()) { + goodResponse = secondResponse; + badResponse = firstResponse; + } else { + goodResponse = firstResponse; + badResponse = secondResponse; + } + + // good response + // check status + assertEquals(Status.OK.getStatusCode(), goodResponse.getStatus()); + + // check entity stream + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ((StreamingOutput) goodResponse.getResponse().getEntity()).write(baos); + assertEquals("some text", new String(baos.toByteArray())); + + // bad response + assertTrue(badResponse.hasThrowable()); + assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), badResponse.getStatus()); + + } + + @Test(expected = IllegalArgumentException.class) + public void testReplicateGetWithEntity() throws Throwable { + testReplicateXXXEntity(HttpMethod.GET); + } + + @Test + public void testReplicatePost() throws Throwable { + testReplicateXXX(HttpMethod.POST); + } + + @Test + public void testReplicatePostWithEntity() throws Throwable { + testReplicateXXXEntity(HttpMethod.POST); + } + + @Test + public void testReplicatePut() throws Throwable { + testReplicateXXX(HttpMethod.PUT); + } + + @Test + public void testReplicatePutWithEntity() throws Throwable { + testReplicateXXXEntity(HttpMethod.PUT); + } + + @Test + public void testReplicateDelete() throws Throwable { + testReplicateXXX(HttpMethod.DELETE); + } + + @Test(expected = IllegalArgumentException.class) + public void testReplicateDeleteWithEntity() throws Throwable { + testReplicateXXXEntity(HttpMethod.DELETE); + } + + @Test + public void testReplicateHead() throws Throwable { + testReplicateXXX(HttpMethod.HEAD); + } + + @Test(expected = IllegalArgumentException.class) + public void testReplicateHeadWithEntity() throws Throwable { + testReplicateXXXEntity(HttpMethod.HEAD); + } + + @Test + public void testReplicateOptions() throws Throwable { + testReplicateXXX(HttpMethod.OPTIONS); + } + + @Test(expected = IllegalArgumentException.class) + public void testReplicateOptionsWithEntity() throws Throwable { + testReplicateXXXEntity(HttpMethod.OPTIONS); + } + + private void testReplicateXXX(final String method) throws Throwable { + testReplicateXXX(executorThreadCount, method); + } + + private void testReplicateXXX(final int numNodes, final String method) throws Throwable { + + // nodes + Set<NodeIdentifier> nodeIds = createNodes(numNodes, "localhost", serverPort); + + // set up responses + for (int i = 0; i < nodeIds.size(); i++) { + HttpResponse response = new HttpResponse(Status.OK, expectedBody); + response.addHeaders(expectedResponseHeaders); + server.addResponseAction(new HttpResponseAction(response)); + } + + // setup request parameters + server.addCheckedParameters(expectedRequestParameters); + + // request headers + server.addCheckedHeaders(expectedRequestHeaders); + + Set<NodeResponse> responses = replicator.replicate( + nodeIds, + method, + prototypeUri, + expectedRequestParameters, + expectedRequestHeaders); + + Set<NodeIdentifier> returnedNodeIds = new HashSet<>(); + for (NodeResponse response : responses) { + + // check if we received an exception + if (response.hasThrowable()) { + throw response.getThrowable(); + } + + // gather ids to verify later + returnedNodeIds.add(response.getNodeId()); + + // check status + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + + Response serverResponse = response.getResponse(); + + // check response headers are copied + assertTrue(containsHeaders(expectedResponseHeaders, serverResponse.getMetadata())); + + // check entity stream + if (HttpMethod.HEAD.equalsIgnoreCase(method)) { + assertNull(serverResponse.getEntity()); + } else { + assertTrue(isEquals((StreamingOutput) serverResponse.getEntity(), expectedBody)); + } + + } + + // check node Ids + assertEquals(nodeIds, returnedNodeIds); + } + + private void testReplicateXXXEntity(final String method) throws Throwable { + testReplicateXXXEntity(executorThreadCount, method); + } + + private void testReplicateXXXEntity(final int numNodes, final String method) throws Throwable { + + // nodes + Set<NodeIdentifier> nodeIds = createNodes(numNodes, "localhost", serverPort); + + // set up responses + for (int i = 0; i < nodeIds.size(); i++) { + HttpResponse response = new HttpResponse(Status.OK, expectedBody); + response.addHeaders(expectedResponseHeaders); + server.addResponseAction(new HttpResponseAction(response)); + } + + // headers + expectedRequestHeaders.put("Content-Type", "application/xml"); + + Set<NodeResponse> responses = replicator.replicate( + nodeIds, + method, + prototypeUri, + expectedEntity, + expectedRequestHeaders); + + Set<NodeIdentifier> returnedNodeIds = new HashSet<>(); + for (NodeResponse response : responses) { + + // check if we received an exception + if (response.hasThrowable()) { + throw response.getThrowable(); + } + + // gather ids to verify later + returnedNodeIds.add(response.getNodeId()); + + // check status + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + + Response serverResponse = response.getResponse(); + + // check response headers are copied + assertTrue(containsHeaders(expectedResponseHeaders, serverResponse.getMetadata())); + + // check entity stream + assertTrue(isEquals((StreamingOutput) serverResponse.getEntity(), expectedBody)); + + } + + // check node Ids + assertEquals(nodeIds, returnedNodeIds); + } + + private Set<NodeIdentifier> createNodes(int num, String host, int apiPort) { + Set<NodeIdentifier> result = new HashSet<>(); + for (int i = 0; i < num; i++) { + result.add(new NodeIdentifier(String.valueOf(i), host, apiPort, host, 1)); + } + return result; + } + + private boolean isEquals(StreamingOutput so, String expectedText) throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + so.write(baos); + return expectedText.equals(new String(baos.toByteArray())); + } + + private boolean containsHeaders(Map<String, String> expectedHeaders, MultivaluedMap<String, Object> metadata) { + for (Map.Entry<String, String> expectedEntry : expectedHeaders.entrySet()) { + if (expectedEntry.getValue().equals(metadata.getFirst(expectedEntry.getKey())) == false) { + return false; + } + } + return true; + } + +} + +@XmlRootElement +class Entity { +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java new file mode 100644 index 0000000..d45a4d1 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.impl; + +import org.apache.nifi.cluster.manager.impl.HttpResponseMapperImpl; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.core.util.MultivaluedMapImpl; +import java.io.ByteArrayInputStream; +import java.util.Map; +import java.util.HashSet; +import java.util.Set; +import java.net.URI; +import java.net.URISyntaxException; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.node.Node; +import org.apache.nifi.cluster.node.Node.Status; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.junit.Before; +import org.junit.Test; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +/** + * @author unattributed + */ +public class HttpResponseMapperImplTest { + + private HttpResponseMapperImpl mapper; + + private URI dummyUri; + + @Before + public void setup() throws URISyntaxException { + mapper = new HttpResponseMapperImpl(); + dummyUri = new URI("http://dummy.com"); + } + + @Test + public void testToNodeStatusWithNo2xxResponses() { + + Set<NodeResponse> nodeResponses = new HashSet<>(); + nodeResponses.add(createNodeResourceResponse("1", 400)); + nodeResponses.add(createNodeResourceResponse("2", 100)); + nodeResponses.add(createNodeResourceResponse("3", 300)); + nodeResponses.add(createNodeResourceResponse("4", 500)); + + Map<NodeResponse, Status> map = mapper.map(dummyUri, nodeResponses); + + // since no 2xx responses, any 5xx is disconnected + for (Map.Entry<NodeResponse, Status> entry : map.entrySet()) { + NodeResponse response = entry.getKey(); + Status status = entry.getValue(); + switch (response.getNodeId().getId()) { + case "1": + assertTrue(status == Node.Status.CONNECTED); + break; + case "2": + assertTrue(status == Node.Status.CONNECTED); + break; + case "3": + assertTrue(status == Node.Status.CONNECTED); + break; + case "4": + assertTrue(status == Node.Status.DISCONNECTED); + break; + } + } + } + + @Test + public void testToNodeStatusWith2xxResponses() { + + Set<NodeResponse> nodeResponses = new HashSet<>(); + nodeResponses.add(createNodeResourceResponse("1", 200)); + nodeResponses.add(createNodeResourceResponse("2", 100)); + nodeResponses.add(createNodeResourceResponse("3", 300)); + nodeResponses.add(createNodeResourceResponse("4", 500)); + + Map<NodeResponse, Status> map = mapper.map(dummyUri, nodeResponses); + + // since there were 2xx responses, any non-2xx is disconnected + for (Map.Entry<NodeResponse, Status> entry : map.entrySet()) { + NodeResponse response = entry.getKey(); + Status status = entry.getValue(); + switch (response.getNodeId().getId()) { + case "1": + assertTrue(status == Node.Status.CONNECTED); + break; + case "2": + assertTrue(status == Node.Status.DISCONNECTED); + break; + case "3": + assertTrue(status == Node.Status.DISCONNECTED); + break; + case "4": + assertTrue(status == Node.Status.DISCONNECTED); + break; + } + } + } + + private NodeResponse createNodeResourceResponse(String nodeId, int statusCode) { + + ClientResponse clientResponse = mock(ClientResponse.class); + when(clientResponse.getStatus()).thenReturn(statusCode); + when(clientResponse.getHeaders()).thenReturn(new MultivaluedMapImpl()); + when(clientResponse.getEntityInputStream()).thenReturn(new ByteArrayInputStream(new byte[0])); + + NodeIdentifier nodeIdentifier = new NodeIdentifier(nodeId, "localhost", 1, "localhost", 1); + return new NodeResponse(nodeIdentifier, "GET", dummyUri, clientResponse, 1L, "111"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java new file mode 100644 index 0000000..7347a94 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.impl; + +import org.apache.nifi.cluster.manager.impl.WebClusterManager; +import static org.junit.Assert.assertEquals; + +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; + +import org.junit.Test; + +public class TestWebClusterManager { + + @Test + public void testNormalizedStatusSnapshotDate() throws ParseException { + final DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:SS.SSS"); + final Date date1 = df.parse("2014/01/01 00:00:00.000"); + final Date date2 = df.parse("2014/01/01 00:04:59.999"); + final Date date3 = df.parse("2014/01/01 00:05:00.000"); + final Date date4 = df.parse("2014/01/01 00:05:00.001"); + + final Date normalized1 = WebClusterManager.normalizeStatusSnapshotDate(date1, 300000); + assertEquals(date1, normalized1); + + final Date normalized2 = WebClusterManager.normalizeStatusSnapshotDate(date2, 300000); + assertEquals(date1, normalized2); + + final Date normalized3 = WebClusterManager.normalizeStatusSnapshotDate(date3, 300000); + assertEquals(date3, normalized3); + + final Date normalized4 = WebClusterManager.normalizeStatusSnapshotDate(date4, 300000); + assertEquals(date3, normalized4); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java new file mode 100644 index 0000000..35380dd --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.testutils; + +import java.io.IOException; +import java.io.Reader; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.core.MediaType; +import org.apache.commons.lang3.StringUtils; + +/** + * Encapsulates an HTTP request. The toString method returns the + * specification-compliant request. + * + * @author unattributed + */ +public class HttpRequest { + + private String method; + private String uri; + private String rawUri; + private String version; + private String body; + private String rawRequest; + private Map<String, String> headers = new HashMap<>(); + private Map<String, List<String>> parameters = new HashMap<>(); + + public static HttpRequestBuilder createFromRequestLine(final String requestLine) { + return new HttpRequestBuilder(requestLine); + } + + public String getBody() { + return body; + } + + public Map<String, String> getHeaders() { + return Collections.unmodifiableMap(headers); + } + + public String getHeaderValue(final String header) { + for (final Map.Entry<String, String> entry : getHeaders().entrySet()) { + if (entry.getKey().equalsIgnoreCase(header)) { + return entry.getValue(); + } + } + return null; + } + + public String getMethod() { + return method; + } + + public Map<String, List<String>> getParameters() { + final Map<String, List<String>> result = new HashMap<>(); + for (final Map.Entry<String, List<String>> entry : parameters.entrySet()) { + result.put(entry.getKey(), Collections.unmodifiableList(entry.getValue())); + } + return Collections.unmodifiableMap(result); + } + + public String getUri() { + return uri; + } + + public String getRawUri() { + return rawUri; + } + + public String getVersion() { + return version; + } + + @Override + public String toString() { + return rawRequest; + } + + /** + * A builder for constructing basic HTTP requests. It handles only enough of + * the HTTP specification to support basic unit testing, and it should not + * be used otherwise. + */ + public static class HttpRequestBuilder { + + private String method; + private String uri; + private String rawUri; + private String version; + private Map<String, String> headers = new HashMap<>(); + private Map<String, List<String>> parameters = new HashMap<>(); + private int contentLength = 0; + private String contentType; + private String body = ""; + private StringBuilder rawRequest = new StringBuilder(); + + private HttpRequestBuilder(final String requestLine) { + + final String[] tokens = requestLine.split(" "); + if (tokens.length != 3) { + throw new IllegalArgumentException("Invalid HTTP Request Line: " + requestLine); + } + + method = tokens[0]; + if (HttpMethod.GET.equalsIgnoreCase(method) || HttpMethod.HEAD.equalsIgnoreCase(method) || HttpMethod.DELETE.equalsIgnoreCase(method) || HttpMethod.OPTIONS.equalsIgnoreCase(method)) { + final int queryIndex = tokens[1].indexOf("?"); + if (queryIndex > -1) { + uri = tokens[1].substring(0, queryIndex); + addParameters(tokens[1].substring(queryIndex + 1)); + } else { + uri = tokens[1]; + } + } + rawUri = tokens[1]; + version = tokens[2]; + rawRequest.append(requestLine).append("\n"); + } + + private void addHeader(final String key, final String value) { + if (key.contains(" ")) { + throw new IllegalArgumentException("Header key may not contain spaces."); + } else if ("content-length".equalsIgnoreCase(key)) { + contentLength = (StringUtils.isBlank(value.trim())) ? 0 : Integer.parseInt(value.trim()); + } else if ("content-type".equalsIgnoreCase(key)) { + contentType = value.trim(); + } + headers.put(key, value); + } + + public void addHeader(final String header) { + final int firstColonIndex = header.indexOf(":"); + if (firstColonIndex < 0) { + throw new IllegalArgumentException("Invalid HTTP Header line: " + header); + } + addHeader(header.substring(0, firstColonIndex), header.substring(firstColonIndex + 1)); + rawRequest.append(header).append("\n"); + } + + // final because constructor calls it + public final void addParameters(final String queryString) { + + if (StringUtils.isBlank(queryString)) { + return; + } + + final String normQueryString; + if (queryString.startsWith("?")) { + normQueryString = queryString.substring(1); + } else { + normQueryString = queryString; + } + final String[] keyValuePairs = normQueryString.split("&"); + for (final String keyValuePair : keyValuePairs) { + final String[] keyValueTokens = keyValuePair.split("="); + try { + addParameter( + URLDecoder.decode(keyValueTokens[0], "utf-8"), + URLDecoder.decode(keyValueTokens[1], "utf-8") + ); + } catch (UnsupportedEncodingException use) { + throw new RuntimeException(use); + } + } + } + + public void addParameter(final String key, final String value) { + + if (key.contains(" ")) { + throw new IllegalArgumentException("Parameter key may not contain spaces: " + key); + } + + final List<String> values; + if (parameters.containsKey(key)) { + values = parameters.get(key); + } else { + values = new ArrayList<>(); + parameters.put(key, values); + } + values.add(value); + } + + public void addBody(final Reader reader) throws IOException { + + if (contentLength <= 0) { + return; + } + + final char[] buf = new char[contentLength]; + int offset = 0; + int numRead = 0; + while (offset < buf.length && (numRead = reader.read(buf, offset, buf.length - offset)) >= 0) { + offset += numRead; + } + body = new String(buf); + rawRequest.append("\n"); + rawRequest.append(body); + } + + public HttpRequest build() throws UnsupportedEncodingException { + + if (HttpMethod.GET.equalsIgnoreCase(method) == false && HttpMethod.HEAD.equalsIgnoreCase(method) == false && contentType.equalsIgnoreCase(MediaType.APPLICATION_FORM_URLENCODED)) { + addParameters(body); + } + + final HttpRequest request = new HttpRequest(); + request.method = this.method; + request.uri = this.uri; + request.rawUri = this.rawUri; + request.version = this.version; + request.headers.putAll(this.headers); + request.parameters.putAll(this.parameters); + request.body = this.body; + request.rawRequest = this.rawRequest.toString(); + + return request; + } + + } +}