http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java deleted file mode 100644 index 84565da..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java +++ /dev/null @@ -1,252 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.node; - -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.nifi.cluster.HeartbeatPayload; -import org.apache.nifi.cluster.protocol.Heartbeat; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.cluster.protocol.ProtocolException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Represents a connected flow controller. Nodes always have an immutable - * identifier and a status. The status may be changed, but never null. - * - * A Node may be cloned, but the cloning is a shallow copy of the instance. - * - * This class overrides hashCode and equals and considers two instances to be - * equal if they have the equal NodeIdentifiers. - * - * @author unattributed - */ -public class Node implements Cloneable, Comparable<Node> { - - private static final Logger lockLogger = LoggerFactory.getLogger("cluster.lock"); - - /** - * The semantics of a Node status are as follows: - * <ul> - * <li>CONNECTED -- a flow controller that is connected to the cluster. A - * connecting node transitions to connected after the cluster receives the - * flow controller's first heartbeat. A connected node can transition to - * disconnecting.</li> - * <li>CONNECTING -- a flow controller has issued a connection request to - * the cluster, but has not yet sent a heartbeat. A connecting node can - * transition to disconnecting or connected. The cluster will not accept any - * external requests to change the flow while any node is connecting.</li> - * <li>DISCONNECTED -- a flow controller that is not connected to the - * cluster. A disconnected node can transition to connecting.</li> - * <li>DISCONNECTING -- a flow controller that is in the process of - * disconnecting from the cluster. A disconnecting node will always - * transition to disconnected.</li> - * </ul> - */ - public static enum Status { - - CONNECTED, - CONNECTING, - DISCONNECTED, - DISCONNECTING - } - - /** - * the node's unique identifier - */ - private final NodeIdentifier nodeId; - - /** - * the node statue - */ - private Status status; - - /** - * the last heartbeat received by from the node - */ - private Heartbeat lastHeartbeat; - - /** - * the payload of the last heartbeat received from the node - */ - private HeartbeatPayload lastHeartbeatPayload; - - /** - * the last time the connection for this node was requested - */ - private AtomicLong connectionRequestedTimestamp = new AtomicLong(0L); - - /** - * a flag to indicate this node was disconnected because of a lack of - * heartbeat - */ - private boolean heartbeatDisconnection; - - public Node(final NodeIdentifier id, final Status status) { - if (id == null) { - throw new IllegalArgumentException("ID may not be null."); - } else if (status == null) { - throw new IllegalArgumentException("Status may not be null."); - } - this.nodeId = id; - this.status = status; - } - - public NodeIdentifier getNodeId() { - return nodeId; - } - - /** - * Returns the last received heartbeat or null if no heartbeat has been set. - * - * @return a heartbeat or null - */ - public Heartbeat getHeartbeat() { - return lastHeartbeat; - } - - public HeartbeatPayload getHeartbeatPayload() { - return lastHeartbeatPayload; - } - - /** - * Sets the last heartbeat received. - * - * @param heartbeat a heartbeat - * - * @throws ProtocolException if the heartbeat's payload failed unmarshalling - */ - public void setHeartbeat(final Heartbeat heartbeat) throws ProtocolException { - this.lastHeartbeat = heartbeat; - if (this.lastHeartbeat == null) { - this.lastHeartbeatPayload = null; - } else { - final byte[] payload = lastHeartbeat.getPayload(); - if (payload == null || payload.length == 0) { - this.lastHeartbeatPayload = null; - } else { - this.lastHeartbeatPayload = HeartbeatPayload.unmarshal(payload); - } - } - } - - /** - * Returns the time of the last received connection request for this node. - * - * @return the time when the connection request for this node was received. - */ - public long getConnectionRequestedTimestamp() { - return connectionRequestedTimestamp.get(); - } - - /** - * Sets the time when the connection request for this node was last - * received. - * - * This method is thread-safe and may be called without obtaining any lock. - * - * @param connectionRequestedTimestamp - */ - public void setConnectionRequestedTimestamp(long connectionRequestedTimestamp) { - this.connectionRequestedTimestamp.set(connectionRequestedTimestamp); - } - - /** - * Returns true if the node was disconnected due to lack of heartbeat; false - * otherwise. - * - * @return true if the node was disconnected due to lack of heartbeat; false - * otherwise. - */ - public boolean isHeartbeatDisconnection() { - return heartbeatDisconnection; - } - - /** - * Sets the status to disconnected and flags the node as being disconnected - * by lack of heartbeat. - */ - public void setHeartbeatDisconnection() { - setStatus(Status.DISCONNECTED); - heartbeatDisconnection = true; - } - - /** - * @return the status - */ - public Status getStatus() { - return status; - } - - /** - * @param status a status - */ - public void setStatus(final Status status) { - if (status == null) { - throw new IllegalArgumentException("Status may not be null."); - } - this.status = status; - heartbeatDisconnection = false; - } - - @Override - public Node clone() { - final Node clone = new Node(nodeId, status); - clone.lastHeartbeat = lastHeartbeat; - clone.lastHeartbeatPayload = lastHeartbeatPayload; - clone.heartbeatDisconnection = heartbeatDisconnection; - clone.connectionRequestedTimestamp = connectionRequestedTimestamp; - return clone; - } - - @Override - public boolean equals(Object obj) { - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - final Node other = (Node) obj; - if (this.nodeId != other.nodeId && (this.nodeId == null || !this.nodeId.equals(other.nodeId))) { - return false; - } - return true; - } - - @Override - public int hashCode() { - int hash = 7; - hash = 53 * hash + (this.nodeId != null ? this.nodeId.hashCode() : 0); - return hash; - } - - @Override - public String toString() { - return nodeId.toString(); - } - - @Override - public int compareTo(final Node o) { - if (o == null) { - return -1; - } - return getNodeId().getId().compareTo(o.getNodeId().getId()); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java deleted file mode 100644 index e26d196..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.spring; - -import java.net.InetSocketAddress; -import java.util.concurrent.TimeUnit; - -import org.apache.nifi.cluster.protocol.impl.ClusterServiceDiscovery; -import org.apache.nifi.cluster.protocol.impl.ClusterServiceLocator; -import org.apache.nifi.io.socket.multicast.DiscoverableService; -import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; -import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.util.NiFiProperties; - -import org.springframework.beans.BeansException; -import org.springframework.beans.factory.DisposableBean; -import org.springframework.beans.factory.FactoryBean; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; - -/** - * Factory bean for creating a singleton ClusterManagerProtocolServiceLocator - * instance. If the application is configured to act as the cluster manager, - * then null is always returned as the created instance. - * - * The cluster manager protocol service represents the socket endpoint for - * sending internal socket messages to the cluster manager. - */ -public class ClusterManagerProtocolServiceLocatorFactoryBean implements FactoryBean, ApplicationContextAware, DisposableBean { - - private ApplicationContext applicationContext; - - private ClusterServiceLocator locator; - - private NiFiProperties properties; - - @Override - public Object getObject() throws Exception { - /* - * If configured for the cluster manager, then the service locator is never used. - */ - if (properties.isClusterManager()) { - return null; - } else if (locator == null) { - - if (properties.getClusterProtocolUseMulticast()) { - - // get the service discovery instance - final ClusterServiceDiscovery serviceDiscovery = applicationContext.getBean("clusterManagerProtocolServiceDiscovery", ClusterServiceDiscovery.class); - - // create service location configuration - final ClusterServiceLocator.AttemptsConfig config = new ClusterServiceLocator.AttemptsConfig(); - config.setNumAttempts(properties.getClusterProtocolMulticastServiceLocatorAttempts()); - - final int delay = (int) FormatUtils.getTimeDuration(properties.getClusterProtocolMulticastServiceLocatorAttemptsDelay(), TimeUnit.SECONDS); - config.setTimeBetweenAttempts(delay); - config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS); - - locator = new ClusterServiceLocator(serviceDiscovery); - locator.setAttemptsConfig(config); - - } else { - final String serviceName = applicationContext.getBean("clusterManagerProtocolServiceName", String.class); - final InetSocketAddress serviceAddress = properties.getClusterNodeUnicastManagerProtocolAddress(); - final DiscoverableService service = new DiscoverableServiceImpl(serviceName, serviceAddress); - locator = new ClusterServiceLocator(service); - } - - // start the locator - locator.start(); - - } - return locator; - - } - - @Override - public Class getObjectType() { - return ClusterServiceLocator.class; - } - - @Override - public boolean isSingleton() { - return true; - } - - @Override - public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - this.applicationContext = applicationContext; - } - - @Override - public void destroy() throws Exception { - if (locator != null && locator.isRunning()) { - locator.stop(); - } - } - - public void setProperties(NiFiProperties properties) { - this.properties = properties; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java deleted file mode 100644 index ef72298..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.spring; - -import java.io.File; -import org.apache.nifi.cluster.firewall.impl.FileBasedClusterNodeFirewall; -import org.apache.nifi.util.NiFiProperties; -import org.springframework.beans.factory.FactoryBean; - -/** - * Factory bean for creating a singleton FileBasedClusterNodeFirewall instance. - */ -public class FileBasedClusterNodeFirewallFactoryBean implements FactoryBean { - - private FileBasedClusterNodeFirewall firewall; - - private NiFiProperties properties; - - @Override - public Object getObject() throws Exception { - if (firewall == null) { - final File config = properties.getClusterManagerNodeFirewallFile(); - final File restoreDirectory = properties.getRestoreDirectory(); - if (config != null) { - firewall = new FileBasedClusterNodeFirewall(config, restoreDirectory); - } - } - return firewall; - } - - @Override - public Class getObjectType() { - return FileBasedClusterNodeFirewall.class; - } - - @Override - public boolean isSingleton() { - return true; - } - - public void setProperties(NiFiProperties properties) { - this.properties = properties; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java deleted file mode 100644 index 7169730..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.spring; - -import java.nio.file.Paths; -import org.apache.nifi.admin.service.AuditService; -import org.apache.nifi.cluster.event.EventManager; -import org.apache.nifi.cluster.firewall.ClusterNodeFirewall; -import org.apache.nifi.cluster.flow.DataFlowManagementService; -import org.apache.nifi.cluster.manager.HttpRequestReplicator; -import org.apache.nifi.cluster.manager.HttpResponseMapper; -import org.apache.nifi.cluster.manager.impl.WebClusterManager; -import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderListener; -import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster; -import org.apache.nifi.controller.service.ControllerServiceLoader; -import org.apache.nifi.encrypt.StringEncryptor; -import org.apache.nifi.io.socket.multicast.DiscoverableService; -import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; -import org.apache.nifi.util.NiFiProperties; -import org.springframework.beans.BeansException; -import org.springframework.beans.factory.FactoryBean; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; - -/** - * Factory bean for creating a singleton WebClusterManager instance. If the - * application is not configured to act as the cluster manager, then null is - * always returned as the created instance. - */ -public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationContextAware { - - private ApplicationContext applicationContext; - - private WebClusterManager clusterManager; - - private NiFiProperties properties; - - private StringEncryptor encryptor; - - @Override - public Object getObject() throws Exception { - if (properties.isClusterManager() && properties.isNode()) { - throw new IllegalStateException("Application may be configured as a cluster manager or a node, but not both."); - } else if (!properties.isClusterManager()) { - /* - * If not configured for the cluster manager, then the cluster manager is never used. - * null is returned so that we don't instantiate a thread pool or other resources. - */ - return null; - } else if (clusterManager == null) { - - // get the service configuration path (fail early) - final String serviceConfigurationFile = properties.getProperty(NiFiProperties.SERVICE_CONFIGURATION_FILE); - if (serviceConfigurationFile == null) { - throw new NullPointerException("The service configuration file has not been specified."); - } - - final HttpRequestReplicator requestReplicator = applicationContext.getBean("httpRequestReplicator", HttpRequestReplicator.class); - final HttpResponseMapper responseMapper = applicationContext.getBean("httpResponseMapper", HttpResponseMapper.class); - final DataFlowManagementService dataFlowService = applicationContext.getBean("dataFlowManagementService", DataFlowManagementService.class); - final ClusterManagerProtocolSenderListener senderListener = applicationContext.getBean("clusterManagerProtocolSenderListener", ClusterManagerProtocolSenderListener.class); - - // create the manager - clusterManager = new WebClusterManager( - requestReplicator, - responseMapper, - dataFlowService, - senderListener, - properties, - encryptor - ); - - // set the service broadcaster - if (properties.getClusterProtocolUseMulticast()) { - - // create broadcaster - final ClusterServicesBroadcaster broadcaster = applicationContext.getBean("clusterServicesBroadcaster", ClusterServicesBroadcaster.class); - - // register the cluster manager protocol service - final String clusterManagerProtocolServiceName = applicationContext.getBean("clusterManagerProtocolServiceName", String.class); - final DiscoverableService clusterManagerProtocolService = new DiscoverableServiceImpl(clusterManagerProtocolServiceName, properties.getClusterManagerProtocolAddress()); - broadcaster.addService(clusterManagerProtocolService); - - clusterManager.setServicesBroadcaster(broadcaster); - } - - // set the event manager - clusterManager.setEventManager(applicationContext.getBean("nodeEventHistoryManager", EventManager.class)); - - // set the cluster firewall - clusterManager.setClusterFirewall(applicationContext.getBean("clusterFirewall", ClusterNodeFirewall.class)); - - // set the audit service - clusterManager.setAuditService(applicationContext.getBean("auditService", AuditService.class)); - - // load the controller services - final ControllerServiceLoader serviceLoader = new ControllerServiceLoader(Paths.get(serviceConfigurationFile)); - serviceLoader.loadControllerServices(clusterManager); - } - return clusterManager; - } - - @Override - public Class getObjectType() { - return WebClusterManager.class; - } - - @Override - public boolean isSingleton() { - return true; - } - - @Override - public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - this.applicationContext = applicationContext; - } - - public void setProperties(NiFiProperties properties) { - this.properties = properties; - } - - public void setEncryptor(final StringEncryptor encryptor) { - this.encryptor = encryptor; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java deleted file mode 100644 index 1ed5b30..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.reporting; - -import org.apache.nifi.cluster.manager.impl.ClusteredReportingContext; -import org.apache.nifi.controller.ProcessScheduler; -import org.apache.nifi.controller.ValidationContextFactory; -import org.apache.nifi.controller.service.ControllerServiceProvider; -import org.apache.nifi.reporting.BulletinRepository; -import org.apache.nifi.reporting.EventAccess; -import org.apache.nifi.reporting.ReportingContext; -import org.apache.nifi.reporting.ReportingTask; - -public class ClusteredReportingTaskNode extends AbstractReportingTaskNode { - - private final EventAccess eventAccess; - private final BulletinRepository bulletinRepository; - private final ControllerServiceProvider serviceProvider; - - public ClusteredReportingTaskNode(final ReportingTask reportingTask, final String id, final ProcessScheduler scheduler, - final EventAccess eventAccess, final BulletinRepository bulletinRepository, final ControllerServiceProvider serviceProvider, - final ValidationContextFactory validationContextFactory) { - super(reportingTask, id, serviceProvider, scheduler, validationContextFactory); - - this.eventAccess = eventAccess; - this.bulletinRepository = bulletinRepository; - this.serviceProvider = serviceProvider; - } - - @Override - public ReportingContext getReportingContext() { - return new ClusteredReportingContext(eventAccess, bulletinRepository, getProperties(), serviceProvider); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/resources/nifi-cluster-manager-context.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/resources/nifi-cluster-manager-context.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/resources/nifi-cluster-manager-context.xml deleted file mode 100644 index 68c29bc..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/resources/nifi-cluster-manager-context.xml +++ /dev/null @@ -1,124 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<!-- marked as lazy so that clustering beans are not created when applications runs in non-clustered mode --> -<beans default-lazy-init="true" - xmlns="http://www.springframework.org/schema/beans" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xmlns:util="http://www.springframework.org/schema/util" - xmlns:context="http://www.springframework.org/schema/context" - xmlns:aop="http://www.springframework.org/schema/aop" - xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd - http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd - http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd - http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd"> - - <!-- jersey client --> - <bean id="jersey-client" class="org.apache.nifi.web.util.WebUtils" factory-method="createClient"> - <constructor-arg> - <bean class="com.sun.jersey.api.client.config.DefaultClientConfig"/> - </constructor-arg> - <constructor-arg> - <bean class="org.apache.nifi.framework.security.util.SslContextFactory" factory-method="createSslContext"> - <constructor-arg ref="nifiProperties"/> - </bean> - </constructor-arg> - </bean> - - <!-- http request replicator --> - <bean id="httpRequestReplicator" class="org.apache.nifi.cluster.manager.impl.HttpRequestReplicatorImpl"> - <constructor-arg index="0"> - <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeApiRequestThreads"/> - </constructor-arg> - <constructor-arg ref="jersey-client" index="1"/> - <constructor-arg index="2"> - <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeApiConnectionTimeout"/> - </constructor-arg> - <constructor-arg index="3"> - <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeApiReadTimeout"/> - </constructor-arg> - <property name="nodeProtocolScheme"> - <bean factory-bean="nifiProperties" factory-method="getClusterProtocolManagerToNodeApiScheme"/> - </property> - </bean> - - <!-- http response mapper --> - <bean id="httpResponseMapper" class="org.apache.nifi.cluster.manager.impl.HttpResponseMapperImpl"/> - - <!-- cluster flow DAO --> - <bean id="dataFlowDao" class="org.apache.nifi.cluster.flow.impl.DataFlowDaoImpl"> - <constructor-arg index="0"> - <bean factory-bean="nifiProperties" factory-method="getFlowConfigurationFileDir"/> - </constructor-arg> - <constructor-arg index="1"> - <bean factory-bean="nifiProperties" factory-method="getRestoreDirectory"/> - </constructor-arg> - <constructor-arg index="2"> - <bean factory-bean="nifiProperties" factory-method="getAutoResumeState"/> - </constructor-arg> - </bean> - - <!-- dataflow management service --> - <bean id="dataFlowManagementService" class="org.apache.nifi.cluster.flow.impl.DataFlowManagementServiceImpl"> - <constructor-arg ref="dataFlowDao"/> - <constructor-arg ref="clusterManagerProtocolSender"/> - <property name="retrievalDelay"> - <bean factory-bean="nifiProperties" factory-method="getClusterManagerFlowRetrievalDelay"/> - </property> - </bean> - - <!-- node event history manager --> - <bean id="nodeEventHistoryManager" class="org.apache.nifi.cluster.event.impl.EventManagerImpl"> - <constructor-arg> - <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeEventHistorySize"/> - </constructor-arg> - </bean> - - <!-- cluster firewall --> - <bean id="clusterFirewall" class="org.apache.nifi.cluster.spring.FileBasedClusterNodeFirewallFactoryBean"> - <property name="properties" ref="nifiProperties"/> - </bean> - - <!-- cluster manager --> - <bean id="clusterManager" class="org.apache.nifi.cluster.spring.WebClusterManagerFactoryBean"> - <property name="properties" ref="nifiProperties"/> - <property name="encryptor" ref="stringEncryptor"/> - </bean> - - <!-- discoverable services --> - - <!-- cluster manager protocol discoverable service --> - - <!-- service name for communicating with the cluster manager using sockets --> - <bean id="clusterManagerProtocolServiceName" class="java.lang.String"> - <constructor-arg value="cluster-manager-protocol" /> - </bean> - - <!-- cluster manager protocol service discovery --> - <bean id="clusterManagerProtocolServiceDiscovery" class="org.apache.nifi.cluster.protocol.impl.ClusterServiceDiscovery"> - <constructor-arg ref="clusterManagerProtocolServiceName" index="0"/> - <constructor-arg index="1"> - <bean factory-bean="nifiProperties" factory-method="getClusterProtocolMulticastAddress"/> - </constructor-arg> - <constructor-arg ref="protocolMulticastConfiguration" index="2"/> - <constructor-arg ref="protocolContext" index="3"/> - </bean> - - <!-- cluster manager protocol service locator --> - <bean id="clusterManagerProtocolServiceLocator" class="org.apache.nifi.cluster.spring.ClusterManagerProtocolServiceLocatorFactoryBean"> - <property name="properties" ref="nifiProperties"/> - </bean> - -</beans> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java deleted file mode 100644 index 09ea44b..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.event.impl; - -import org.apache.nifi.cluster.event.impl.EventManagerImpl; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import org.apache.nifi.cluster.event.Event; -import org.apache.nifi.cluster.event.Event.Category; -import org.apache.nifi.cluster.event.EventManager; -import org.junit.Test; -import static org.junit.Assert.*; - -/** - * @author unattributed - */ -public class EventManagerImplTest { - - @Test(expected = IllegalArgumentException.class) - public void testNonPositiveHistorySize() { - new EventManagerImpl(0); - } - - @Test - public void testGetEventsUnknownSource() { - EventManager manager = new EventManagerImpl(1); - assertEquals(Collections.EMPTY_LIST, manager.getEvents("unknown value")); - } - - @Test - public void testGetEvents() { - - EventManager manager = new EventManagerImpl(2); - - Event e1 = new Event("1", "Event1", Category.INFO, 0); - Event e2 = new Event("1", "Event2", Category.INFO, 1); - - manager.addEvent(e1); - manager.addEvent(e2); - - List<Event> events = manager.getEvents("1"); - - // assert newest to oldest - assertEquals(Arrays.asList(e2, e1), events); - } - - @Test - public void testGetMostRecentEventUnknownSource() { - EventManager manager = new EventManagerImpl(1); - assertNull(manager.getMostRecentEvent("unknown value")); - } - - @Test - public void testGetMostRecentEvent() { - - EventManager manager = new EventManagerImpl(2); - - Event e1 = new Event("1", "Event1", Category.INFO, 0); - Event e2 = new Event("1", "Event2", Category.INFO, 1); - - manager.addEvent(e1); - manager.addEvent(e2); - - // assert newest to oldest - assertEquals(e2, manager.getMostRecentEvent("1")); - } - - @Test - public void testAddEventExceedsHistorySize() { - - EventManager manager = new EventManagerImpl(1); - - Event e1 = new Event("1", "Event1", Category.INFO, 0); - Event e2 = new Event("1", "Event2", Category.INFO, 1); - - manager.addEvent(e1); - manager.addEvent(e2); - - List<Event> events = manager.getEvents("1"); - - // assert oldest evicted - assertEquals(Arrays.asList(e2), events); - - } - - @Test - public void testClearHistory() { - - EventManager manager = new EventManagerImpl(1); - - Event e1 = new Event("1", "Event1", Category.INFO, 0); - Event e2 = new Event("1", "Event2", Category.INFO, 1); - - manager.addEvent(e1); - manager.addEvent(e2); - - manager.clearEventHistory("1"); - - // assert oldest evicted - assertTrue(manager.getEvents("1").isEmpty()); - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java deleted file mode 100644 index e5db7ca..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.firewall.impl; - -import java.io.File; -import java.io.IOException; -import org.apache.nifi.util.file.FileUtils; -import org.junit.After; -import org.junit.Before; -import static org.junit.Assert.*; -import org.junit.Test; - -public class FileBasedClusterNodeFirewallTest { - - private FileBasedClusterNodeFirewall ipsFirewall; - - private FileBasedClusterNodeFirewall acceptAllFirewall; - - private File ipsConfig; - - private File emptyConfig; - - private File restoreDirectory; - - @Before - public void setup() throws Exception { - - ipsConfig = new File("src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt"); - emptyConfig = new File("src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt"); - - restoreDirectory = new File(System.getProperty("java.io.tmpdir") + "/firewall_restore"); - - ipsFirewall = new FileBasedClusterNodeFirewall(ipsConfig, restoreDirectory); - acceptAllFirewall = new FileBasedClusterNodeFirewall(emptyConfig); - } - - @After - public void teardown() throws IOException { - deleteFile(restoreDirectory); - } - - @Test - public void testSyncWithRestore() { - assertEquals(ipsConfig.length(), new File(restoreDirectory, ipsConfig.getName()).length()); - } - - @Test - public void testIsPermissibleWithExactMatch() { - assertTrue(ipsFirewall.isPermissible("2.2.2.2")); - } - - @Test - public void testIsPermissibleWithSubnetMatch() { - assertTrue(ipsFirewall.isPermissible("3.3.3.255")); - } - - @Test - public void testIsPermissibleWithNoMatch() { - assertFalse(ipsFirewall.isPermissible("255.255.255.255")); - } - - @Test - public void testIsPermissibleWithMalformedData() { - assertFalse(ipsFirewall.isPermissible("abc")); - } - - @Test - public void testIsPermissibleWithEmptyConfig() { - assertTrue(acceptAllFirewall.isPermissible("1.1.1.1")); - } - - @Test - public void testIsPermissibleWithEmptyConfigWithMalformedData() { - assertTrue(acceptAllFirewall.isPermissible("abc")); - } - - private boolean deleteFile(final File file) { - if (file.isDirectory()) { - FileUtils.deleteFilesInDir(file, null, null, true, true); - } - return FileUtils.deleteFile(file, null, 10); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java deleted file mode 100644 index f9ba016..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java +++ /dev/null @@ -1,343 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.flow.impl; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.IOException; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; - -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.parsers.ParserConfigurationException; - -import org.apache.nifi.cluster.flow.DataFlowDao; -import org.apache.nifi.cluster.flow.PersistedFlowState; -import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.cluster.protocol.ProtocolContext; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolHandler; -import org.apache.nifi.cluster.protocol.StandardDataFlow; -import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderImpl; -import org.apache.nifi.cluster.protocol.impl.SocketProtocolListener; -import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; -import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; -import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; -import org.apache.nifi.io.socket.ServerSocketConfiguration; -import org.apache.nifi.io.socket.SocketConfiguration; - -import org.apache.commons.io.FileUtils; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.w3c.dom.Document; -import org.w3c.dom.Element; -import org.xml.sax.SAXException; - -/** - * @author unattributed - */ -public class DataFlowManagementServiceImplTest { - - private DataFlowManagementServiceImpl service; - private File restoreLocation; - private File primaryLocation; - private DataFlowDao dao; - private int apiDummyPort; - private int socketPort; - private SocketConfiguration socketConfig; - private ClusterManagerProtocolSender sender; - private ServerSocketConfiguration serverSocketConfig; - private SocketProtocolListener listener; - - @Before - public void setup() throws IOException { - - primaryLocation = new File(System.getProperty("java.io.tmpdir") + "/primary" + this.getClass().getSimpleName()); - restoreLocation = new File(System.getProperty("java.io.tmpdir") + "/restore" + this.getClass().getSimpleName()); - - FileUtils.deleteDirectory(primaryLocation); - FileUtils.deleteDirectory(restoreLocation); - - ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); - - socketConfig = new SocketConfiguration(); - socketConfig.setSocketTimeout(1000); - serverSocketConfig = new ServerSocketConfiguration(); - - dao = new DataFlowDaoImpl(primaryLocation, restoreLocation, false); - - sender = new ClusterManagerProtocolSenderImpl(socketConfig, protocolContext); - - service = new DataFlowManagementServiceImpl(dao, sender); - service.start(); - - listener = new SocketProtocolListener(1, 0, serverSocketConfig, protocolContext); - listener.start(); - - apiDummyPort = 7777; - socketPort = listener.getPort(); - } - - @After - public void teardown() throws IOException { - - if (service != null && service.isRunning()) { - service.stop(); - } - - if (listener != null && listener.isRunning()) { - try { - listener.stop(); - } catch (final Exception ex) { - ex.printStackTrace(System.out); - } - } - FileUtils.deleteDirectory(primaryLocation); - FileUtils.deleteDirectory(restoreLocation); - - } - - @Test - public void testLoadFlowWithNonExistentFlow() throws ParserConfigurationException, SAXException, IOException { - verifyFlow(); - } - - @Test - public void testLoadFlowWithNonExistentFlowWhenServiceStopped() throws IOException, SAXException, ParserConfigurationException { - service.stop(); - verifyFlow(); - } - - private void verifyFlow() throws ParserConfigurationException, SAXException, IOException { - final byte[] flowBytes = service.loadDataFlow().getDataFlow().getFlow(); - final DocumentBuilder docBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder(); - final Document doc = docBuilder.parse(new ByteArrayInputStream(flowBytes)); - final Element controller = (Element) doc.getElementsByTagName("flowController").item(0); - final Element rootGroup = (Element) controller.getElementsByTagName("rootGroup").item(0); - final String rootGroupName = rootGroup.getElementsByTagName("name").item(0).getTextContent(); - assertEquals("NiFi Flow", rootGroupName); - } - - @Test - public void testLoadFlowSingleNode() throws Exception { - String flowStr = "<rootGroup />"; - byte[] flowBytes = flowStr.getBytes(); - listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); - - NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort); - service.setNodeIds(new HashSet<>(Arrays.asList(nodeId))); - service.setPersistedFlowState(PersistedFlowState.STALE); - - assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState()); - - // sleep long enough for the flow retriever to run - waitForState(PersistedFlowState.CURRENT); - - assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState()); - assertArrayEquals(flowBytes, service.loadDataFlow().getDataFlow().getFlow()); - - } - - @Test - public void testLoadFlowWithSameNodeIds() throws Exception { - - String flowStr = "<rootGroup />"; - listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); - - NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort); - NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort); - service.setNodeIds(new HashSet<>(Arrays.asList(nodeId1, nodeId2))); - service.setPersistedFlowState(PersistedFlowState.STALE); - - assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState()); - - // sleep long enough for the flow retriever to run - waitForState(PersistedFlowState.CURRENT); - - // verify that flow is current - assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState()); - - // add same ids in different order - service.setNodeIds(new HashSet<>(Arrays.asList(nodeId2, nodeId1))); - - // verify flow is still current - assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState()); - - } - - @Test - public void testLoadFlowWithABadNode() throws Exception { - - String flowStr = "<rootGroup />"; - byte[] flowBytes = flowStr.getBytes(); - listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); - - NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1); - NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort); - service.setNodeIds(new HashSet<>(Arrays.asList(nodeId1, nodeId2))); - service.setPersistedFlowState(PersistedFlowState.STALE); - - assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState()); - - // sleep long enough for the flow retriever to run - waitForState(PersistedFlowState.CURRENT); - - assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState()); - assertArrayEquals(flowBytes, service.loadDataFlow().getDataFlow().getFlow()); - - } - - @Test - public void testLoadFlowWithConstantNodeIdChanging() throws Exception { - String flowStr = "<rootGroup />"; - byte[] flowBytes = flowStr.getBytes(); - listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); - - NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1); - NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort); - - for (int i = 0; i < 1000; i++) { - service.setNodeIds(new HashSet<>(Arrays.asList(nodeId1, nodeId2))); - service.setPersistedFlowState(PersistedFlowState.STALE); - assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState()); - } - - // sleep long enough for the flow retriever to run - waitForState(PersistedFlowState.CURRENT); - - assertEquals(PersistedFlowState.CURRENT, service.getPersistedFlowState()); - assertArrayEquals(flowBytes, service.loadDataFlow().getDataFlow().getFlow()); - } - - @Test - public void testLoadFlowWithConstantNodeIdChangingWithRetrievalDelay() throws Exception { - - String flowStr = "<rootGroup />"; - listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); - - NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1); - NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort); - - service.setRetrievalDelay("5 sec"); - for (int i = 0; i < 1000; i++) { - service.setNodeIds(new HashSet<>(Arrays.asList(nodeId1, nodeId2))); - service.setPersistedFlowState(PersistedFlowState.STALE); - assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState()); - } - - // sleep long enough for the flow retriever to run - waitForState(PersistedFlowState.STALE); - - assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState()); - - } - - @Test - public void testStopRequestedWhileRetrieving() throws Exception { - - String flowStr = "<rootGroup />"; - listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); - Set<NodeIdentifier> nodeIds = new HashSet<>(); - for (int i = 0; i < 1000; i++) { - nodeIds.add(new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1)); - } - nodeIds.add(new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort)); - - long lastRetrievalTime = service.getLastRetrievalTime(); - - service.setNodeIds(nodeIds); - service.setPersistedFlowState(PersistedFlowState.STALE); - assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState()); - - // sleep long enough for the flow retriever to run - waitForState(PersistedFlowState.STALE); - - service.stop(); - - service.setPersistedFlowState(PersistedFlowState.STALE); - assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState()); - - assertEquals(lastRetrievalTime, service.getLastRetrievalTime()); - - } - - @Test - public void testLoadFlowUnknownState() throws Exception { - - String flowStr = "<rootGroup />"; - byte[] flowBytes = flowStr.getBytes(); - listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); - NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort); - - service.setNodeIds(new HashSet<>(Arrays.asList(nodeId))); - service.setPersistedFlowState(PersistedFlowState.STALE); - assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState()); - - service.setPersistedFlowState(PersistedFlowState.UNKNOWN); - - assertEquals(PersistedFlowState.UNKNOWN, service.getPersistedFlowState()); - - service.setPersistedFlowState(PersistedFlowState.STALE); - assertEquals(PersistedFlowState.STALE, service.getPersistedFlowState()); - - // sleep long enough for the flow retriever to run - waitForState(PersistedFlowState.CURRENT); - - assertArrayEquals(flowBytes, service.loadDataFlow().getDataFlow().getFlow()); - - } - - private class FlowRequestProtocolHandler implements ProtocolHandler { - - private StandardDataFlow dataFlow; - - public FlowRequestProtocolHandler(final StandardDataFlow dataFlow) { - this.dataFlow = dataFlow; - } - - @Override - public boolean canHandle(ProtocolMessage msg) { - return true; - } - - @Override - public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { - FlowResponseMessage response = new FlowResponseMessage(); - response.setDataFlow(dataFlow); - return response; - } - - } - - private void waitForState(PersistedFlowState state) throws InterruptedException { - for (int i = 0; i < 30; i++) { - if (service.getPersistedFlowState() == state) { - break; - } else { - Thread.sleep(1000); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java deleted file mode 100644 index 0c65aba..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java +++ /dev/null @@ -1,368 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.manager.impl; - -import org.apache.nifi.cluster.manager.impl.HttpRequestReplicatorImpl; -import javax.ws.rs.core.Response; -import javax.xml.bind.annotation.XmlRootElement; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.core.MultivaluedMap; -import java.net.URISyntaxException; -import java.util.Arrays; -import java.util.Iterator; -import javax.ws.rs.core.StreamingOutput; -import org.apache.nifi.cluster.manager.testutils.HttpResponse; -import org.apache.nifi.cluster.manager.testutils.HttpServer; -import com.sun.jersey.api.client.Client; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.URI; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import javax.ws.rs.core.Response.Status; -import org.apache.nifi.cluster.manager.NodeResponse; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.apache.nifi.cluster.manager.testutils.HttpResponseAction; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import static org.junit.Assert.*; - -/** - * @author unattributed - */ -public class HttpRequestReplicatorImplTest { - - private Client client; - private HttpRequestReplicatorImpl replicator; - private int executorThreadCount; - private int serverThreadCount; - private int serverPort; - private HttpServer server; - private Map<String, List<String>> expectedRequestParameters; - private Map<String, String> expectedRequestHeaders; - private Map<String, String> expectedResponseHeaders; - private Object expectedEntity; - private String expectedBody; - private URI prototypeUri; - - @Before - public void setUp() throws IOException, URISyntaxException { - - executorThreadCount = 5; - serverThreadCount = 3; - - client = Client.create(); - - replicator = new HttpRequestReplicatorImpl(executorThreadCount, client, "1 sec", "1 sec"); - replicator.start(); - - expectedRequestHeaders = new HashMap<>(); - expectedRequestHeaders.put("header1", "header value1"); - expectedRequestHeaders.put("header2", "header value2"); - - expectedRequestParameters = new HashMap<>(); - expectedRequestParameters.put("param1", Arrays.asList("p value1")); - expectedRequestParameters.put("param2", Arrays.asList("p value2")); - - expectedResponseHeaders = new HashMap<>(); - expectedResponseHeaders.put("header1", "header value1"); - expectedResponseHeaders.put("header2", "header value2"); - - expectedEntity = new Entity(); - - expectedBody = "some text"; - - prototypeUri = new URI("http://prototype.host/path/to/resource"); - - server = new HttpServer(serverThreadCount, 0); - server.start(); - serverPort = server.getPort(); - } - - @After - public void teardown() { - if (server.isRunning()) { - server.stop(); - } - if (replicator.isRunning()) { - replicator.stop(); - } - } - - @Test - public void testReplicateGetLessNodesThanReplicatorThreads() throws Throwable { - testReplicateXXX(executorThreadCount - 1, HttpMethod.GET); - } - - @Test - public void testReplicateGetMoreNodesThanReplicatorThreads() throws Throwable { - testReplicateXXX(executorThreadCount + 1, HttpMethod.GET); - } - - @Test - public void testReplicateGetWithUnresponsiveNode() throws Throwable { - - // nodes - Set<NodeIdentifier> nodeIds = createNodes(2, "localhost", serverPort); - - // response - HttpResponse expectedResponse = new HttpResponse(Status.OK, expectedBody); - - // first response normal, second response slow - server.addResponseAction(new HttpResponseAction(expectedResponse)); - server.addResponseAction(new HttpResponseAction(expectedResponse, 3500)); - - Set<NodeResponse> responses = replicator.replicate( - nodeIds, - HttpMethod.GET, - prototypeUri, - expectedRequestParameters, - expectedRequestHeaders); - - assertEquals(nodeIds.size(), responses.size()); - - Iterator<NodeResponse> nodeResponseItr = responses.iterator(); - - NodeResponse firstResponse = nodeResponseItr.next(); - NodeResponse secondResponse = nodeResponseItr.next(); - NodeResponse goodResponse; - NodeResponse badResponse; - if (firstResponse.hasThrowable()) { - goodResponse = secondResponse; - badResponse = firstResponse; - } else { - goodResponse = firstResponse; - badResponse = secondResponse; - } - - // good response - // check status - assertEquals(Status.OK.getStatusCode(), goodResponse.getStatus()); - - // check entity stream - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ((StreamingOutput) goodResponse.getResponse().getEntity()).write(baos); - assertEquals("some text", new String(baos.toByteArray())); - - // bad response - assertTrue(badResponse.hasThrowable()); - assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), badResponse.getStatus()); - - } - - @Test(expected = IllegalArgumentException.class) - public void testReplicateGetWithEntity() throws Throwable { - testReplicateXXXEntity(HttpMethod.GET); - } - - @Test - public void testReplicatePost() throws Throwable { - testReplicateXXX(HttpMethod.POST); - } - - @Test - public void testReplicatePostWithEntity() throws Throwable { - testReplicateXXXEntity(HttpMethod.POST); - } - - @Test - public void testReplicatePut() throws Throwable { - testReplicateXXX(HttpMethod.PUT); - } - - @Test - public void testReplicatePutWithEntity() throws Throwable { - testReplicateXXXEntity(HttpMethod.PUT); - } - - @Test - public void testReplicateDelete() throws Throwable { - testReplicateXXX(HttpMethod.DELETE); - } - - @Test(expected = IllegalArgumentException.class) - public void testReplicateDeleteWithEntity() throws Throwable { - testReplicateXXXEntity(HttpMethod.DELETE); - } - - @Test - public void testReplicateHead() throws Throwable { - testReplicateXXX(HttpMethod.HEAD); - } - - @Test(expected = IllegalArgumentException.class) - public void testReplicateHeadWithEntity() throws Throwable { - testReplicateXXXEntity(HttpMethod.HEAD); - } - - @Test - public void testReplicateOptions() throws Throwable { - testReplicateXXX(HttpMethod.OPTIONS); - } - - @Test(expected = IllegalArgumentException.class) - public void testReplicateOptionsWithEntity() throws Throwable { - testReplicateXXXEntity(HttpMethod.OPTIONS); - } - - private void testReplicateXXX(final String method) throws Throwable { - testReplicateXXX(executorThreadCount, method); - } - - private void testReplicateXXX(final int numNodes, final String method) throws Throwable { - - // nodes - Set<NodeIdentifier> nodeIds = createNodes(numNodes, "localhost", serverPort); - - // set up responses - for (int i = 0; i < nodeIds.size(); i++) { - HttpResponse response = new HttpResponse(Status.OK, expectedBody); - response.addHeaders(expectedResponseHeaders); - server.addResponseAction(new HttpResponseAction(response)); - } - - // setup request parameters - server.addCheckedParameters(expectedRequestParameters); - - // request headers - server.addCheckedHeaders(expectedRequestHeaders); - - Set<NodeResponse> responses = replicator.replicate( - nodeIds, - method, - prototypeUri, - expectedRequestParameters, - expectedRequestHeaders); - - Set<NodeIdentifier> returnedNodeIds = new HashSet<>(); - for (NodeResponse response : responses) { - - // check if we received an exception - if (response.hasThrowable()) { - throw response.getThrowable(); - } - - // gather ids to verify later - returnedNodeIds.add(response.getNodeId()); - - // check status - assertEquals(Status.OK.getStatusCode(), response.getStatus()); - - Response serverResponse = response.getResponse(); - - // check response headers are copied - assertTrue(containsHeaders(expectedResponseHeaders, serverResponse.getMetadata())); - - // check entity stream - if (HttpMethod.HEAD.equalsIgnoreCase(method)) { - assertNull(serverResponse.getEntity()); - } else { - assertTrue(isEquals((StreamingOutput) serverResponse.getEntity(), expectedBody)); - } - - } - - // check node Ids - assertEquals(nodeIds, returnedNodeIds); - } - - private void testReplicateXXXEntity(final String method) throws Throwable { - testReplicateXXXEntity(executorThreadCount, method); - } - - private void testReplicateXXXEntity(final int numNodes, final String method) throws Throwable { - - // nodes - Set<NodeIdentifier> nodeIds = createNodes(numNodes, "localhost", serverPort); - - // set up responses - for (int i = 0; i < nodeIds.size(); i++) { - HttpResponse response = new HttpResponse(Status.OK, expectedBody); - response.addHeaders(expectedResponseHeaders); - server.addResponseAction(new HttpResponseAction(response)); - } - - // headers - expectedRequestHeaders.put("Content-Type", "application/xml"); - - Set<NodeResponse> responses = replicator.replicate( - nodeIds, - method, - prototypeUri, - expectedEntity, - expectedRequestHeaders); - - Set<NodeIdentifier> returnedNodeIds = new HashSet<>(); - for (NodeResponse response : responses) { - - // check if we received an exception - if (response.hasThrowable()) { - throw response.getThrowable(); - } - - // gather ids to verify later - returnedNodeIds.add(response.getNodeId()); - - // check status - assertEquals(Status.OK.getStatusCode(), response.getStatus()); - - Response serverResponse = response.getResponse(); - - // check response headers are copied - assertTrue(containsHeaders(expectedResponseHeaders, serverResponse.getMetadata())); - - // check entity stream - assertTrue(isEquals((StreamingOutput) serverResponse.getEntity(), expectedBody)); - - } - - // check node Ids - assertEquals(nodeIds, returnedNodeIds); - } - - private Set<NodeIdentifier> createNodes(int num, String host, int apiPort) { - Set<NodeIdentifier> result = new HashSet<>(); - for (int i = 0; i < num; i++) { - result.add(new NodeIdentifier(String.valueOf(i), host, apiPort, host, 1)); - } - return result; - } - - private boolean isEquals(StreamingOutput so, String expectedText) throws IOException { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - so.write(baos); - return expectedText.equals(new String(baos.toByteArray())); - } - - private boolean containsHeaders(Map<String, String> expectedHeaders, MultivaluedMap<String, Object> metadata) { - for (Map.Entry<String, String> expectedEntry : expectedHeaders.entrySet()) { - if (expectedEntry.getValue().equals(metadata.getFirst(expectedEntry.getKey())) == false) { - return false; - } - } - return true; - } - -} - -@XmlRootElement -class Entity { -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java deleted file mode 100644 index d45a4d1..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.manager.impl; - -import org.apache.nifi.cluster.manager.impl.HttpResponseMapperImpl; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.core.util.MultivaluedMapImpl; -import java.io.ByteArrayInputStream; -import java.util.Map; -import java.util.HashSet; -import java.util.Set; -import java.net.URI; -import java.net.URISyntaxException; -import org.apache.nifi.cluster.manager.NodeResponse; -import org.apache.nifi.cluster.node.Node; -import org.apache.nifi.cluster.node.Node.Status; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.junit.Before; -import org.junit.Test; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - -/** - * @author unattributed - */ -public class HttpResponseMapperImplTest { - - private HttpResponseMapperImpl mapper; - - private URI dummyUri; - - @Before - public void setup() throws URISyntaxException { - mapper = new HttpResponseMapperImpl(); - dummyUri = new URI("http://dummy.com"); - } - - @Test - public void testToNodeStatusWithNo2xxResponses() { - - Set<NodeResponse> nodeResponses = new HashSet<>(); - nodeResponses.add(createNodeResourceResponse("1", 400)); - nodeResponses.add(createNodeResourceResponse("2", 100)); - nodeResponses.add(createNodeResourceResponse("3", 300)); - nodeResponses.add(createNodeResourceResponse("4", 500)); - - Map<NodeResponse, Status> map = mapper.map(dummyUri, nodeResponses); - - // since no 2xx responses, any 5xx is disconnected - for (Map.Entry<NodeResponse, Status> entry : map.entrySet()) { - NodeResponse response = entry.getKey(); - Status status = entry.getValue(); - switch (response.getNodeId().getId()) { - case "1": - assertTrue(status == Node.Status.CONNECTED); - break; - case "2": - assertTrue(status == Node.Status.CONNECTED); - break; - case "3": - assertTrue(status == Node.Status.CONNECTED); - break; - case "4": - assertTrue(status == Node.Status.DISCONNECTED); - break; - } - } - } - - @Test - public void testToNodeStatusWith2xxResponses() { - - Set<NodeResponse> nodeResponses = new HashSet<>(); - nodeResponses.add(createNodeResourceResponse("1", 200)); - nodeResponses.add(createNodeResourceResponse("2", 100)); - nodeResponses.add(createNodeResourceResponse("3", 300)); - nodeResponses.add(createNodeResourceResponse("4", 500)); - - Map<NodeResponse, Status> map = mapper.map(dummyUri, nodeResponses); - - // since there were 2xx responses, any non-2xx is disconnected - for (Map.Entry<NodeResponse, Status> entry : map.entrySet()) { - NodeResponse response = entry.getKey(); - Status status = entry.getValue(); - switch (response.getNodeId().getId()) { - case "1": - assertTrue(status == Node.Status.CONNECTED); - break; - case "2": - assertTrue(status == Node.Status.DISCONNECTED); - break; - case "3": - assertTrue(status == Node.Status.DISCONNECTED); - break; - case "4": - assertTrue(status == Node.Status.DISCONNECTED); - break; - } - } - } - - private NodeResponse createNodeResourceResponse(String nodeId, int statusCode) { - - ClientResponse clientResponse = mock(ClientResponse.class); - when(clientResponse.getStatus()).thenReturn(statusCode); - when(clientResponse.getHeaders()).thenReturn(new MultivaluedMapImpl()); - when(clientResponse.getEntityInputStream()).thenReturn(new ByteArrayInputStream(new byte[0])); - - NodeIdentifier nodeIdentifier = new NodeIdentifier(nodeId, "localhost", 1, "localhost", 1); - return new NodeResponse(nodeIdentifier, "GET", dummyUri, clientResponse, 1L, "111"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java deleted file mode 100644 index 13a192f..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.manager.impl; - -import org.apache.nifi.cluster.manager.impl.WebClusterManager; - -import static org.junit.Assert.assertEquals; - -import java.text.DateFormat; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Locale; - -import org.junit.Test; - -public class TestWebClusterManager { - - @Test - public void testNormalizedStatusSnapshotDate() throws ParseException { - final DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:SS.SSS", Locale.US); - final Date date1 = df.parse("2014/01/01 00:00:00.000"); - final Date date2 = df.parse("2014/01/01 00:04:59.999"); - final Date date3 = df.parse("2014/01/01 00:05:00.000"); - final Date date4 = df.parse("2014/01/01 00:05:00.001"); - - final Date normalized1 = WebClusterManager.normalizeStatusSnapshotDate(date1, 300000); - assertEquals(date1, normalized1); - - final Date normalized2 = WebClusterManager.normalizeStatusSnapshotDate(date2, 300000); - assertEquals(date1, normalized2); - - final Date normalized3 = WebClusterManager.normalizeStatusSnapshotDate(date3, 300000); - assertEquals(date3, normalized3); - - final Date normalized4 = WebClusterManager.normalizeStatusSnapshotDate(date4, 300000); - assertEquals(date3, normalized4); - } - -}