http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java new file mode 100644 index 0000000..702f081 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java @@ -0,0 +1,551 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.flow.impl; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.UUID; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Marshaller; +import javax.xml.bind.Unmarshaller; +import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.transform.OutputKeys; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; + +import org.apache.nifi.cluster.flow.ClusterDataFlow; +import org.apache.nifi.cluster.flow.DaoException; +import org.apache.nifi.cluster.flow.DataFlowDao; +import org.apache.nifi.cluster.flow.PersistedFlowState; +import org.apache.nifi.cluster.protocol.DataFlow; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.StandardDataFlow; +import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter; +import org.apache.nifi.file.FileUtils; +import org.apache.nifi.io.BufferedInputStream; +import org.apache.nifi.io.BufferedOutputStream; +import org.apache.nifi.io.ByteArrayInputStream; +import org.apache.nifi.io.StreamUtils; +import org.apache.nifi.logging.NiFiLog; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +/** + * Implements the FlowDao interface. The implementation tracks the state of the + * dataflow by annotating the filename of the flow state file. Specifically, the + * implementation correlates PersistedFlowState states to filename extensions. + * The correlation is as follows: + * <ul> + * <li> CURRENT maps to flow.xml </li> + * <li> STALE maps to flow.xml.stale </li> + * <li> UNKNOWN maps to flow.xml.unknown </li> + * </ul> + * Whenever the flow state changes, the flow state file's name is updated to + * denote its state. + * + * The implementation also provides for a restore directory that may be + * configured for higher availability. At instance creation, if the primary or + * restore directories have multiple flow state files, an exception is thrown. + * If the primary directory has a current flow state file, but the restore + * directory does not, then the primary flow state file is copied to the restore + * directory. If the restore directory has a current flow state file, but the + * primary directory does not, then the restore flow state file is copied to the + * primary directory. If both the primary and restore directories have a current + * flow state file and the files are different, then an exception is thrown. + * + * When the flow state file is saved, it is always saved first to the restore + * directory followed by a save to the primary directory. When the flow state + * file is loaded, a check is made to verify that the primary and restore flow + * state files are both current. If either is not current, then an exception is + * thrown. The primary flow state file is always read when the load method is + * called. + * + * @author unattributed + */ +public class DataFlowDaoImpl implements DataFlowDao { + + private final File primaryDirectory; + private final File restoreDirectory; + private final boolean autoStart; + private final String generatedRootGroupId = UUID.randomUUID().toString(); + + public static final String STALE_EXT = ".stale"; + public static final String UNKNOWN_EXT = ".unknown"; + public static final String FLOW_PACKAGE = "flow.tar"; + public static final String FLOW_XML_FILENAME = "flow.xml"; + public static final String TEMPLATES_FILENAME = "templates.xml"; + public static final String SNIPPETS_FILENAME = "snippets.xml"; + public static final String CLUSTER_INFO_FILENAME = "cluster-info.xml"; + + private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(DataFlowDaoImpl.class)); + + public DataFlowDaoImpl(final File primaryDirectory) throws DaoException { + this(primaryDirectory, null, false); + } + + public DataFlowDaoImpl(final File primaryDirectory, final File restoreDirectory, final boolean autoStart) throws DaoException { + + // sanity check that primary directory is a directory, creating it if necessary + if (primaryDirectory == null) { + throw new IllegalArgumentException("Primary directory may not be null."); + } else if (!primaryDirectory.exists()) { + if (!primaryDirectory.mkdir()) { + throw new DaoException(String.format("Failed to create primary directory '%s'", primaryDirectory.getAbsolutePath())); + } + } else if (!primaryDirectory.isDirectory()) { + throw new IllegalArgumentException("Primary directory must be a directory."); + } + + this.autoStart = autoStart; + + try { + this.primaryDirectory = primaryDirectory; + this.restoreDirectory = restoreDirectory; + + if (restoreDirectory == null) { + // check that we have exactly one current flow state file + ensureSingleCurrentStateFile(primaryDirectory); + } else { + + // check that restore directory is a directory, creating it if necessary + FileUtils.ensureDirectoryExistAndCanAccess(restoreDirectory); + + // check that restore directory is not the same as the primary directory + if (primaryDirectory.getAbsolutePath().equals(restoreDirectory.getAbsolutePath())) { + throw new IllegalArgumentException(String.format("Primary directory '%s' is the same as restore directory '%s' ", + primaryDirectory.getAbsolutePath(), restoreDirectory.getAbsolutePath())); + } + + final File[] primaryFlowStateFiles = getFlowStateFiles(primaryDirectory); + final File[] restoreFlowStateFiles = getFlowStateFiles(restoreDirectory); + + // if more than one state file in either primary or restore, then throw exception + if (primaryFlowStateFiles.length > 1) { + throw new IllegalStateException(String.format("Found multiple dataflow state files in primary directory '%s'", primaryDirectory)); + } else if (restoreFlowStateFiles.length > 1) { + throw new IllegalStateException(String.format("Found multiple dataflow state files in restore directory '%s'", restoreDirectory)); + } + + // check that the single primary state file we found is current or create a new one + final File primaryFlowStateFile = ensureSingleCurrentStateFile(primaryDirectory); + + // check that the single restore state file we found is current or create a new one + final File restoreFlowStateFile = ensureSingleCurrentStateFile(restoreDirectory); + + // if there was a difference in flow state file directories, then copy the appropriate files + if (restoreFlowStateFiles.length == 0 && primaryFlowStateFiles.length != 0) { + // copy primary state file to restore + FileUtils.copyFile(primaryFlowStateFile, restoreFlowStateFile, false, false, logger); + } else if (primaryFlowStateFiles.length == 0 && restoreFlowStateFiles.length != 0) { + // copy restore state file to primary + FileUtils.copyFile(restoreFlowStateFile, primaryFlowStateFile, false, false, logger); + } else { + // sync the primary copy with the restore copy + FileUtils.syncWithRestore(primaryFlowStateFile, restoreFlowStateFile, logger); + } + + } + } catch (final IOException | IllegalArgumentException | IllegalStateException | JAXBException ex) { + throw new DaoException(ex); + } + + } + + @Override + public ClusterDataFlow loadDataFlow() throws DaoException { + try { + return parseDataFlow(getExistingFlowStateFile(primaryDirectory)); + } catch (final IOException | JAXBException ex) { + throw new DaoException(ex); + } + } + + @Override + public void saveDataFlow(final ClusterDataFlow dataFlow) throws DaoException { + try { + + final File primaryStateFile = getFlowStateFile(primaryDirectory); + + // write to restore before writing to primary in case primary experiences problems + if (restoreDirectory != null) { + final File restoreStateFile = getFlowStateFile(restoreDirectory); + if (restoreStateFile == null) { + if (primaryStateFile == null) { + writeDataFlow(createNewFlowStateFile(restoreDirectory), dataFlow); + } else { + throw new DaoException(String.format("Unable to save dataflow because dataflow state file in primary directory '%s' exists, but it does not exist in the restore directory '%s'", + primaryDirectory.getAbsolutePath(), restoreDirectory.getAbsolutePath())); + } + } else { + if (primaryStateFile == null) { + throw new DaoException(String.format("Unable to save dataflow because dataflow state file in restore directory '%s' exists, but it does not exist in the primary directory '%s'", + restoreDirectory.getAbsolutePath(), primaryDirectory.getAbsolutePath())); + } else { + final PersistedFlowState primaryFlowState = getPersistedFlowState(primaryStateFile); + final PersistedFlowState restoreFlowState = getPersistedFlowState(restoreStateFile); + if (primaryFlowState == restoreFlowState) { + writeDataFlow(restoreStateFile, dataFlow); + } else { + throw new DaoException(String.format("Unable to save dataflow because state file in primary directory '%s' has state '%s', but the state file in the restore directory '%s' has state '%s'", + primaryDirectory.getAbsolutePath(), primaryFlowState, restoreDirectory.getAbsolutePath(), restoreFlowState)); + } + } + } + } + + // write dataflow to primary + if (primaryStateFile == null) { + writeDataFlow(createNewFlowStateFile(primaryDirectory), dataFlow); + } else { + writeDataFlow(primaryStateFile, dataFlow); + } + + } catch (final IOException | JAXBException ex) { + throw new DaoException(ex); + } + } + + @Override + public PersistedFlowState getPersistedFlowState() { + // trust restore over primary if configured for restore + if (restoreDirectory == null) { + return getPersistedFlowState(getExistingFlowStateFile(primaryDirectory)); + } else { + return getPersistedFlowState(getExistingFlowStateFile(restoreDirectory)); + } + } + + @Override + public void setPersistedFlowState(final PersistedFlowState flowState) throws DaoException { + // rename restore before primary if configured for restore + if (restoreDirectory != null) { + renameFlowStateFile(getExistingFlowStateFile(restoreDirectory), flowState); + } + renameFlowStateFile(getExistingFlowStateFile(primaryDirectory), flowState); + } + + private File ensureSingleCurrentStateFile(final File dir) throws IOException, JAXBException { + + // ensure that we have at most one state file and if we have one, it is current + final File[] directoryFlowStateFiles = getFlowStateFiles(dir); + if (directoryFlowStateFiles.length > 1) { + throw new DaoException(String.format("Found multiple dataflow state files in directory '%s'", dir)); + } else if (directoryFlowStateFiles.length == 0) { + // create a new file if none exist + return createNewFlowStateFile(dir); + } else { + // check that the single flow state file is current + final PersistedFlowState flowState = getPersistedFlowState(directoryFlowStateFiles[0]); + if (PersistedFlowState.CURRENT == flowState) { + return directoryFlowStateFiles[0]; + } else { + throw new DaoException(String.format("Dataflow state file '%s' must be current.", directoryFlowStateFiles[0].getAbsolutePath())); + } + } + + } + + private PersistedFlowState getPersistedFlowState(final File file) { + final String path = file.getAbsolutePath(); + if (path.endsWith(STALE_EXT)) { + return PersistedFlowState.STALE; + } else if (path.endsWith(UNKNOWN_EXT)) { + return PersistedFlowState.UNKNOWN; + } else { + return PersistedFlowState.CURRENT; + } + } + + private File getFlowStateFile(final File dir) { + final File[] files = getFlowStateFiles(dir); + if (files.length > 1) { + throw new IllegalStateException(String.format("Expected at most one dataflow state file, but found %s files.", files.length)); + } else if (files.length == 0) { + return null; + } else { + return files[0]; + } + } + + private File getExistingFlowStateFile(final File dir) { + final File file = getFlowStateFile(dir); + if (file == null) { + throw new IllegalStateException(String.format("Expected a dataflow state file, but none existed in directory '%s'", dir.getAbsolutePath())); + } + return file; + } + + private File[] getFlowStateFiles(final File dir) { + final File[] files = dir.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return (name.equals(FLOW_PACKAGE) || name.endsWith(STALE_EXT) || name.endsWith(UNKNOWN_EXT)); + } + }); + + if (files == null) { + return new File[0]; + } else { + return files; + } + } + + private File removeStateFileExtension(final File file) { + + final String path = file.getAbsolutePath(); + final int stateFileExtIndex; + if (path.endsWith(STALE_EXT)) { + stateFileExtIndex = path.lastIndexOf(STALE_EXT); + } else if (path.endsWith(UNKNOWN_EXT)) { + stateFileExtIndex = path.lastIndexOf(UNKNOWN_EXT); + } else { + stateFileExtIndex = path.length(); + } + + return new File(path.substring(0, stateFileExtIndex)); + } + + private File addStateFileExtension(final File file, final PersistedFlowState state) { + switch (state) { + case CURRENT: { + return file; + } + case STALE: { + return new File(file.getAbsolutePath() + STALE_EXT); + } + case UNKNOWN: { + return new File(file.getAbsolutePath() + UNKNOWN_EXT); + } + default: { + throw new RuntimeException("Unsupported PersistedFlowState Enum value: " + state); + } + } + } + + private File createNewFlowStateFile(final File dir) throws IOException, JAXBException { + final File stateFile = new File(dir, FLOW_PACKAGE); + stateFile.createNewFile(); + + final byte[] flowBytes = getEmptyFlowBytes(); + final byte[] templateBytes = new byte[0]; + final byte[] snippetBytes = new byte[0]; + final DataFlow dataFlow = new StandardDataFlow(flowBytes, templateBytes, snippetBytes); + + final ClusterMetadata clusterMetadata = new ClusterMetadata(); + writeDataFlow(stateFile, dataFlow, clusterMetadata); + + return stateFile; + } + + private byte[] getEmptyFlowBytes() throws IOException { + try { + final DocumentBuilder docBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder(); + final Document document = docBuilder.newDocument(); + + final Element controller = document.createElement("flowController"); + document.appendChild(controller); + + controller.appendChild(createTextElement(document, "maxThreadCount", "15")); + + final Element rootGroup = document.createElement("rootGroup"); + rootGroup.appendChild(createTextElement(document, "id", generatedRootGroupId)); + rootGroup.appendChild(createTextElement(document, "name", "NiFi Flow")); + + // create the position element + final Element positionElement = createTextElement(document, "position", ""); + positionElement.setAttribute("x", "0.0"); + positionElement.setAttribute("y", "0.0"); + rootGroup.appendChild(positionElement); + + rootGroup.appendChild(createTextElement(document, "comment", "")); + controller.appendChild(rootGroup); + + final Transformer transformer = TransformerFactory.newInstance().newTransformer(); + transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2"); + transformer.setOutputProperty(OutputKeys.INDENT, "yes"); + + final DOMSource source = new DOMSource(document); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final StreamResult result = new StreamResult(baos); + transformer.transform(source, result); + + return baos.toByteArray(); + } catch (final Exception e) { + throw new IOException(e); + } + } + + private Element createTextElement(final Document document, final String elementName, final String value) { + final Element element = document.createElement(elementName); + element.setTextContent(value); + return element; + } + + private void renameFlowStateFile(final File flowStateFile, final PersistedFlowState newState) throws DaoException { + final PersistedFlowState existingState = getPersistedFlowState(flowStateFile); + if (existingState != newState) { + final File newFlowStateFile = addStateFileExtension(removeStateFileExtension(flowStateFile), newState); + if (flowStateFile.renameTo(newFlowStateFile) == false) { + throw new DaoException( + String.format("Failed to rename flow state file '%s' to new name '%s'", flowStateFile.getAbsolutePath(), newFlowStateFile.getAbsolutePath())); + } + } + } + + private ClusterDataFlow parseDataFlow(final File file) throws IOException, JAXBException, DaoException { + byte[] flowBytes = new byte[0]; + byte[] templateBytes = new byte[0]; + byte[] snippetBytes = new byte[0]; + byte[] clusterInfoBytes = new byte[0]; + + try (final InputStream inStream = new FileInputStream(file); + final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(inStream))) { + TarArchiveEntry tarEntry; + while ((tarEntry = tarIn.getNextTarEntry()) != null) { + switch (tarEntry.getName()) { + case FLOW_XML_FILENAME: + flowBytes = new byte[(int) tarEntry.getSize()]; + StreamUtils.fillBuffer(tarIn, flowBytes, true); + break; + case TEMPLATES_FILENAME: + templateBytes = new byte[(int) tarEntry.getSize()]; + StreamUtils.fillBuffer(tarIn, templateBytes, true); + break; + case SNIPPETS_FILENAME: + snippetBytes = new byte[(int) tarEntry.getSize()]; + StreamUtils.fillBuffer(tarIn, snippetBytes, true); + break; + case CLUSTER_INFO_FILENAME: + clusterInfoBytes = new byte[(int) tarEntry.getSize()]; + StreamUtils.fillBuffer(tarIn, clusterInfoBytes, true); + break; + default: + throw new DaoException("Found Unexpected file in dataflow configuration: " + tarEntry.getName()); + } + } + } + + final ClusterMetadata clusterMetadata; + if (clusterInfoBytes.length == 0) { + clusterMetadata = null; + } else { + final Unmarshaller clusterMetadataUnmarshaller = ClusterMetadata.jaxbCtx.createUnmarshaller(); + clusterMetadata = (ClusterMetadata) clusterMetadataUnmarshaller.unmarshal(new ByteArrayInputStream(clusterInfoBytes)); + } + + final StandardDataFlow dataFlow = new StandardDataFlow(flowBytes, templateBytes, snippetBytes); + dataFlow.setAutoStartProcessors(autoStart); + + return new ClusterDataFlow(dataFlow, (clusterMetadata == null) ? null : clusterMetadata.getPrimaryNodeId()); + } + + private void writeDataFlow(final File file, final ClusterDataFlow clusterDataFlow) throws IOException, JAXBException { + + // get the data flow + DataFlow dataFlow = clusterDataFlow.getDataFlow(); + + // if no dataflow, then write a new dataflow + if (dataFlow == null) { + dataFlow = new StandardDataFlow(new byte[0], new byte[0], new byte[0]); + } + + // setup the cluster metadata + final ClusterMetadata clusterMetadata = new ClusterMetadata(); + clusterMetadata.setPrimaryNodeId(clusterDataFlow.getPrimaryNodeId()); + + // write to disk + writeDataFlow(file, dataFlow, clusterMetadata); + } + + private void writeTarEntry(final TarArchiveOutputStream tarOut, final String filename, final byte[] bytes) throws IOException { + final TarArchiveEntry flowEntry = new TarArchiveEntry(filename); + flowEntry.setSize(bytes.length); + tarOut.putArchiveEntry(flowEntry); + tarOut.write(bytes); + tarOut.closeArchiveEntry(); + } + + private void writeDataFlow(final File file, final DataFlow dataFlow, final ClusterMetadata clusterMetadata) throws IOException, JAXBException { + + try (final OutputStream fos = new FileOutputStream(file); + final TarArchiveOutputStream tarOut = new TarArchiveOutputStream(new BufferedOutputStream(fos))) { + + writeTarEntry(tarOut, FLOW_XML_FILENAME, dataFlow.getFlow()); + writeTarEntry(tarOut, TEMPLATES_FILENAME, dataFlow.getTemplates()); + writeTarEntry(tarOut, SNIPPETS_FILENAME, dataFlow.getSnippets()); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(256); + writeClusterMetadata(clusterMetadata, baos); + final byte[] clusterInfoBytes = baos.toByteArray(); + + writeTarEntry(tarOut, CLUSTER_INFO_FILENAME, clusterInfoBytes); + } + } + + private void writeClusterMetadata(final ClusterMetadata clusterMetadata, final OutputStream os) throws IOException, JAXBException { + // write cluster metadata to output stream + final Marshaller marshaller = ClusterMetadata.jaxbCtx.createMarshaller(); + marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true); + marshaller.setProperty(Marshaller.JAXB_FRAGMENT, true); + marshaller.setProperty(Marshaller.JAXB_ENCODING, "UTF-8"); + marshaller.marshal(clusterMetadata, os); + } + + @XmlRootElement(name = "clusterMetadata") + private static class ClusterMetadata { + + private NodeIdentifier primaryNodeId; + + private static final JAXBContext jaxbCtx; + + static { + try { + jaxbCtx = JAXBContext.newInstance(ClusterMetadata.class); + } catch (final JAXBException je) { + throw new RuntimeException(je); + } + } + + @XmlJavaTypeAdapter(NodeIdentifierAdapter.class) + public NodeIdentifier getPrimaryNodeId() { + return primaryNodeId; + } + + public void setPrimaryNodeId(final NodeIdentifier primaryNodeId) { + this.primaryNodeId = primaryNodeId; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java new file mode 100644 index 0000000..e135af3 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.flow.impl; + +import java.util.Collections; +import java.util.Date; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.nifi.cluster.flow.ClusterDataFlow; +import org.apache.nifi.cluster.flow.DaoException; +import org.apache.nifi.cluster.flow.DataFlowDao; +import org.apache.nifi.cluster.flow.DataFlowManagementService; +import org.apache.nifi.cluster.flow.PersistedFlowState; +import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.StandardDataFlow; +import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; +import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; +import org.apache.nifi.logging.NiFiLog; +import org.apache.nifi.util.FormatUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implements FlowManagementService interface. The service tries to keep the + * cluster's flow current with regards to the available nodes. + * + * The instance may be configured with a retrieval delay, which will reduce the + * number of retrievals performed by the service at the expense of increasing + * the chances that the service will not be able to provide a current flow to + * the caller. + * + * By default, the service will try to update the flow as quickly as possible. + * Configuring a delay enables a less aggressive retrieval strategy. + * Specifically, the eligible retrieval time is reset every time the flow state + * is set to STALE. If the state is set to UNKNOWN or CURRENT, then the flow + * will not be retrieved. + * + * @author unattributed + */ +public class DataFlowManagementServiceImpl implements DataFlowManagementService { + + /* + * Developer Note: + * + * This class maintains an ExecutorService and a Runnable. + * Although the class is not externally threadsafe, its internals are protected to + * accommodate multithread access between the ExecutorServer and the Runnable. + * + */ + private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(DataFlowManagementServiceImpl.class)); + + private final DataFlowDao flowDao; + + private final ClusterManagerProtocolSender sender; + + private final Set<NodeIdentifier> nodeIds = new CopyOnWriteArraySet<>(); + + private final AtomicBoolean stopRequested = new AtomicBoolean(false); + + private final AtomicLong lastRetrievalTime = new AtomicLong(-1); + + private Timer flowRetriever; + + private long retrievableAfterTime = 0L; + + private AtomicInteger retrievalDelaySeconds = new AtomicInteger(0); + + private final TimingReentrantLock resourceLock = new TimingReentrantLock(new ReentrantLock()); + + public DataFlowManagementServiceImpl(final DataFlowDao flowDao, final ClusterManagerProtocolSender sender) { + if (flowDao == null) { + throw new IllegalArgumentException("Flow DAO may not be null."); + } else if (sender == null) { + throw new IllegalArgumentException("Cluster Manager Protocol Sender may not be null."); + } + this.flowDao = flowDao; + this.sender = sender; + } + + @Override + public void start() { + + if (isRunning()) { + throw new IllegalArgumentException("Instance is already running."); + } + + // reset stop requested + stopRequested.set(false); + + // setup flow retreiver timer + flowRetriever = new Timer("Flow Management Service", /* is daemon */ true); + flowRetriever.schedule(new FlowRetrieverTimerTask(), 0, 500); + } + + @Override + public boolean isRunning() { + return (flowRetriever != null); + } + + @Override + public void stop() { + + if (isRunning() == false) { + throw new IllegalArgumentException("Instance is already stopped."); + } + + // record stop request + stopRequested.set(true); + + flowRetriever.cancel(); + flowRetriever = null; + + } + + @Override + public ClusterDataFlow loadDataFlow() throws DaoException { + resourceLock.lock(); + try { + return flowDao.loadDataFlow(); + } finally { + resourceLock.unlock("loadDataFlow"); + } + } + + @Override + public void updatePrimaryNode(final NodeIdentifier nodeId) { + resourceLock.lock(); + try { + final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow(); + + final StandardDataFlow dataFlow; + if (existingClusterDataFlow == null) { + dataFlow = null; + } else { + dataFlow = existingClusterDataFlow.getDataFlow(); + } + + flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId)); + } finally { + resourceLock.unlock("updatePrimaryNode"); + } + } + + @Override + public PersistedFlowState getPersistedFlowState() { + resourceLock.lock(); + try { + return flowDao.getPersistedFlowState(); + } finally { + resourceLock.unlock("getPersistedFlowState"); + } + } + + @Override + public boolean isFlowCurrent() { + return PersistedFlowState.CURRENT == getPersistedFlowState(); + } + + @Override + public void setPersistedFlowState(final PersistedFlowState flowState) { + // lock to ensure state change and retrievable time update are atomic + resourceLock.lock(); + try { + flowDao.setPersistedFlowState(flowState); + if (PersistedFlowState.STALE == flowState) { + retrievableAfterTime = new Date().getTime() + (getRetrievalDelaySeconds() * 1000); + } else if (PersistedFlowState.UNKNOWN == flowState || PersistedFlowState.CURRENT == flowState) { + retrievableAfterTime = Long.MAX_VALUE; + } + } finally { + resourceLock.unlock("setPersistedFlowState"); + } + } + + @Override + public Set<NodeIdentifier> getNodeIds() { + return Collections.unmodifiableSet(nodeIds); + } + + @Override + public void setNodeIds(final Set<NodeIdentifier> nodeIds) { + + if (nodeIds == null) { + throw new IllegalArgumentException("Node IDs may not be null."); + } + + resourceLock.lock(); + try { + + if (this.nodeIds.equals(nodeIds)) { + return; + } + + this.nodeIds.clear(); + this.nodeIds.addAll(nodeIds); + + } finally { + resourceLock.unlock("setNodeIds"); + } + + } + + @Override + public int getRetrievalDelaySeconds() { + return retrievalDelaySeconds.get(); + } + + @Override + public void setRetrievalDelay(final String retrievalDelay) { + this.retrievalDelaySeconds.set((int) FormatUtils.getTimeDuration(retrievalDelay, TimeUnit.SECONDS)); + } + + public ClusterManagerProtocolSender getSender() { + return sender; + } + + public long getLastRetrievalTime() { + return lastRetrievalTime.get(); + } + + /** + * A timer task for issuing FlowRequestMessage messages to nodes to retrieve + * an updated flow. + */ + private class FlowRetrieverTimerTask extends TimerTask { + + @Override + public void run() { + + resourceLock.lock(); + try { + // if flow is current, then we're done + if (isFlowCurrent()) { + return; + } + } catch (final Exception ex) { + logger.info("Encountered exception checking if flow is current caused by " + ex, ex); + } finally { + resourceLock.unlock("FlowRetrieverTimerTask - isFlowCurrent"); + } + + final FlowRequestMessage request = new FlowRequestMessage(); + for (final NodeIdentifier nodeId : getNodeIds()) { + try { + // setup request + request.setNodeId(nodeId); + + // record request time + final long requestSentTime = new Date().getTime(); + + resourceLock.lock(); + try { + // sanity checks before making request + if (stopRequested.get()) { // did we receive a stop request + logger.debug("Stopping runnable prematurely because a request to stop was issued."); + return; + } else if (requestSentTime < retrievableAfterTime) { + /* + * Retrievable after time was updated while obtaining + * the lock, so try again later + */ + return; + } + } finally { + resourceLock.unlock("FlowRetrieverTimerTask - check stopRequested"); + } + + // send request + final FlowResponseMessage response = sender.requestFlow(request); + + resourceLock.lock(); + try { + // check if the retrieved flow is still valid + if (requestSentTime > retrievableAfterTime) { + logger.info("Saving retrieved flow."); + + final StandardDataFlow dataFlow = response.getDataFlow(); + final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow(); + final ClusterDataFlow currentClusterDataFlow; + if (existingClusterDataFlow == null) { + currentClusterDataFlow = new ClusterDataFlow(dataFlow, null); + } else { + currentClusterDataFlow = new ClusterDataFlow(dataFlow, existingClusterDataFlow.getPrimaryNodeId()); + } + flowDao.saveDataFlow(currentClusterDataFlow); + flowDao.setPersistedFlowState(PersistedFlowState.CURRENT); + lastRetrievalTime.set(new Date().getTime()); + } + + /* + * Retrievable after time was updated while requesting + * the flow, so try again later. + */ + } finally { + resourceLock.unlock("FlowRetrieverTimerTask - saveDataFlow"); + } + + } catch (final Throwable t) { + logger.info("Encountered exception retrieving flow from node " + nodeId + " caused by " + t, t); + } + } + } + } + + private static class TimingReentrantLock { + + private final Lock lock; + private static final Logger logger = LoggerFactory.getLogger("dataFlowManagementService.lock"); + + private final ThreadLocal<Long> lockTime = new ThreadLocal<>(); + + public TimingReentrantLock(final Lock lock) { + this.lock = lock; + } + + public void lock() { + lock.lock(); + lockTime.set(System.nanoTime()); + } + + public void unlock(final String task) { + final long nanosLocked = System.nanoTime() - lockTime.get(); + lock.unlock(); + + final long millisLocked = TimeUnit.MILLISECONDS.convert(nanosLocked, TimeUnit.NANOSECONDS); + if (millisLocked > 100L) { + logger.debug("Lock held for {} milliseconds for task: {}", millisLocked, task); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java new file mode 100644 index 0000000..0fcac8c --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.manager.exception.PrimaryRoleAssignmentException; +import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException; +import org.apache.nifi.cluster.manager.exception.UnknownNodeException; +import org.apache.nifi.cluster.manager.exception.IneligiblePrimaryNodeException; +import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException; +import org.apache.nifi.cluster.manager.exception.IllegalNodeReconnectionException; +import org.apache.nifi.cluster.manager.exception.NodeDisconnectionException; +import org.apache.nifi.cluster.NodeInformant; +import org.apache.nifi.cluster.event.Event; +import org.apache.nifi.cluster.node.Node; +import org.apache.nifi.cluster.node.Node.Status; +import org.apache.nifi.cluster.protocol.ConnectionRequest; +import org.apache.nifi.cluster.protocol.ConnectionResponse; +import org.apache.nifi.cluster.protocol.Heartbeat; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.diagnostics.SystemDiagnostics; +import org.apache.nifi.reporting.BulletinRepository; + +import java.util.List; +import java.util.Set; + +/** + * Defines the interface for a ClusterManager. The cluster manager is a + * threadsafe centralized manager for a cluster. Members of a cluster are nodes. + * A member becomes a node by issuing a connection request to the manager. The + * manager maintains the set of nodes. Nodes may be disconnected, reconnected, + * and deleted. + * + * Nodes are responsible for sending heartbeats to the manager to indicate their + * liveliness. A manager may disconnect a node if it does not receive a + * heartbeat within a configurable time period. A cluster manager instance may + * be configured with how often to monitor received heartbeats + * (getHeartbeatMonitoringIntervalSeconds()) and the maximum time that may + * elapse between node heartbeats before disconnecting the node + * (getMaxHeartbeatGapSeconds()). + * + * Since only a single node may execute isolated processors, the cluster manager + * maintains the notion of a primary node. The primary node is chosen at cluster + * startup and retains the role until a user requests a different node to be the + * primary node. + * + * @author unattributed + */ +public interface ClusterManager extends NodeInformant { + + /** + * Handles a node's heartbeat. + * + * @param heartbeat a heartbeat + * + */ + void handleHeartbeat(Heartbeat heartbeat); + + /** + * @param statuses the statuses of the nodes + * @return the set of nodes + */ + Set<Node> getNodes(Status... statuses); + + /** + * @param nodeId + * @return returns the node with the given identifier or null if node does + * not exist + */ + Node getNode(String nodeId); + + /** + * @param statuses + * @return the set of node identifiers with the given node status + */ + Set<NodeIdentifier> getNodeIds(Status... statuses); + + /** + * Deletes the node with the given node identifier. If the given node is the + * primary node, then a subsequent request may be made to the manager to set + * a new primary node. + * + * @param nodeId the node identifier + * @param userDn the Distinguished Name of the user requesting the node be + * deleted from the cluster + * + * @throws UnknownNodeException if the node does not exist + * @throws IllegalNodeDeletionException if the node is not in a disconnected + * state + */ + void deleteNode(String nodeId, String userDn) throws UnknownNodeException, IllegalNodeDeletionException; + + /** + * Requests a connection to the cluster. + * + * @param request the request + * + * @return the response + */ + ConnectionResponse requestConnection(ConnectionRequest request); + + /** + * Services reconnection requests for a given node. If the node indicates + * reconnection failure, then the node will be set to disconnected. + * Otherwise, a reconnection request will be sent to the node, initiating + * the connection handshake. + * + * @param nodeId a node identifier + * @param userDn the Distinguished Name of the user requesting the + * reconnection + * + * @throws UnknownNodeException if the node does not exist + * @throws IllegalNodeReconnectionException if the node is not disconnected + */ + void requestReconnection(String nodeId, String userDn) throws UnknownNodeException, IllegalNodeReconnectionException; + + /** + * Requests the node with the given identifier be disconnected. + * + * @param nodeId the node identifier + * @param userDn the Distinguished Name of the user requesting the + * disconnection + * + * @throws UnknownNodeException if the node does not exist + * @throws IllegalNodeDisconnectionException if the node cannot be + * disconnected due to the cluster's state (e.g., node is last connected + * node or node is primary) + * @throws UnknownNodeException if the node does not exist + * @throws IllegalNodeDisconnectionException if the node is not disconnected + * @throws NodeDisconnectionException if the disconnection failed + */ + void requestDisconnection(String nodeId, String userDn) throws UnknownNodeException, IllegalNodeDisconnectionException, NodeDisconnectionException; + + /** + * @return the time in seconds to wait between successive executions of + * heartbeat monitoring + */ + int getHeartbeatMonitoringIntervalSeconds(); + + /** + * @return the maximum time in seconds that is allowed between successive + * heartbeats of a node before disconnecting the node + */ + int getMaxHeartbeatGapSeconds(); + + /** + * Returns a list of node events for the node with the given identifier. The + * events will be returned in order of most recent to least recent according + * to the creation date of the event. + * + * @param nodeId the node identifier + * + * @return the list of events or an empty list if no node exists with the + * given identifier + */ + List<Event> getNodeEvents(final String nodeId); + + /** + * Revokes the primary role from the current primary node and assigns the + * primary role to given given node ID. + * + * If role revocation fails, then the current primary node is set to + * disconnected while retaining the primary role and no role assignment is + * performed. + * + * If role assignment fails, then the given node is set to disconnected and + * is given the primary role. + * + * @param nodeId the node identifier + * @param userDn the Distinguished Name of the user requesting that the + * Primary Node be assigned + * + * @throws UnknownNodeException if the node with the given identifier does + * not exist + * @throws IneligiblePrimaryNodeException if the node with the given + * identifier is not eligible to be the primary node + * @throws PrimaryRoleAssignmentException if the cluster was unable to + * change the primary role to the requested node + */ + void setPrimaryNode(String nodeId, String userDn) throws UnknownNodeException, IneligiblePrimaryNodeException, PrimaryRoleAssignmentException; + + /** + * @return the primary node of the cluster or null if no primary node exists + */ + Node getPrimaryNode(); + + /** + * Returns the bulletin repository. + * + * @return + */ + BulletinRepository getBulletinRepository(); + + /** + * Returns a {@link ProcessGroupStatus} that represents the status of all + * nodes with the given {@link Status}es for the given ProcessGroup id, or + * null if no nodes exist with the given statuses + * + * @param groupId + * @return + */ + ProcessGroupStatus getProcessGroupStatus(String groupId); + + /** + * Returns a merged representation of the System Diagnostics for all nodes + * in the cluster + * + * @return + */ + SystemDiagnostics getSystemDiagnostics(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpClusterManager.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpClusterManager.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpClusterManager.java new file mode 100644 index 0000000..2cf5812 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpClusterManager.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException; +import org.apache.nifi.cluster.manager.exception.UriConstructionException; +import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException; +import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException; +import org.apache.nifi.cluster.manager.exception.NoResponseFromNodesException; +import org.apache.nifi.cluster.manager.exception.SafeModeMutableRequestException; +import org.apache.nifi.cluster.protocol.NodeIdentifier; + +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Extends the ClusterManager interface to define how requests issued to the + * cluster manager are federated to the nodes. Specifically, the HTTP protocol + * is used for communicating requests to the cluster manager and to the nodes. + * + * @author unattributed + */ +public interface HttpClusterManager extends ClusterManager { + + /** + * Federates the HTTP request to all connected nodes in the cluster. The + * given URI's host and port will not be used and instead will be adjusted + * for each node's host and port. The node URIs are guaranteed to be + * constructed before issuing any requests, so if a UriConstructionException + * is thrown, then it is guaranteed that no request was issued. + * + * @param method the HTTP method (e.g., GET, POST, PUT, DELETE, HEAD) + * @param uri the base request URI (up to, but not including, the query + * string) + * @param parameters the request parameters + * @param headers the request headers + * + * @return the client response + * + * @throws NoConnectedNodesException if no nodes are connected as results of + * the request + * @throws NoResponseFromNodesException if no response could be obtained + * @throws UriConstructionException if there was an issue constructing the + * URIs tailored for each individual node + * @throws ConnectingNodeMutableRequestException if the request was a PUT, + * POST, DELETE and a node is connecting to the cluster + * @throws DisconnectedNodeMutableRequestException if the request was a PUT, + * POST, DELETE and a node is disconnected from the cluster + * @throws SafeModeMutableRequestException if the request was a PUT, POST, + * DELETE and a the cluster is in safe mode + */ + NodeResponse applyRequest(String method, URI uri, Map<String, List<String>> parameters, Map<String, String> headers) + throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException, + DisconnectedNodeMutableRequestException, SafeModeMutableRequestException; + + /** + * Federates the HTTP request to the nodes specified. The given URI's host + * and port will not be used and instead will be adjusted for each node's + * host and port. The node URIs are guaranteed to be constructed before + * issuing any requests, so if a UriConstructionException is thrown, then it + * is guaranteed that no request was issued. + * + * @param method the HTTP method (e.g., GET, POST, PUT, DELETE, HEAD) + * @param uri the base request URI (up to, but not including, the query + * string) + * @param parameters the request parameters + * @param headers the request headers + * @param nodeIdentifiers the NodeIdentifier for each node that the request + * should be replaced to + * + * @return the client response + * + * @throws NoConnectedNodesException if no nodes are connected as results of + * the request + * @throws NoResponseFromNodesException if no response could be obtained + * @throws UriConstructionException if there was an issue constructing the + * URIs tailored for each individual node + * @throws ConnectingNodeMutableRequestException if the request was a PUT, + * POST, DELETE and a node is connecting to the cluster + * @throws DisconnectedNodeMutableRequestException if the request was a PUT, + * POST, DELETE and a node is disconnected from the cluster + * @throws SafeModeMutableRequestException if the request was a PUT, POST, + * DELETE and a the cluster is in safe mode + */ + NodeResponse applyRequest(String method, URI uri, Map<String, List<String>> parameters, Map<String, String> headers, + Set<NodeIdentifier> nodeIdentifiers) + throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException, + DisconnectedNodeMutableRequestException, SafeModeMutableRequestException; + + /** + * Federates the HTTP request to all connected nodes in the cluster. The + * given URI's host and port will not be used and instead will be adjusted + * for each node's host and port. The node URIs are guaranteed to be + * constructed before issuing any requests, so if a UriConstructionException + * is thrown, then it is guaranteed that no request was issued. + * + * @param method the HTTP method (e.g., GET, POST, PUT, DELETE, HEAD) + * @param uri the base request URI (up to, but not including, the query + * string) + * @param entity the HTTP request entity + * @param headers the request headers + * + * @return the client response + * + * @throws NoConnectedNodesException if no nodes are connected as results of + * the request + * @throws NoResponseFromNodesException if no response could be obtained + * @throws UriConstructionException if there was an issue constructing the + * URIs tailored for each individual node + * @throws ConnectingNodeMutableRequestException if the request was a PUT, + * POST, DELETE and a node is connecting to the cluster + * @throws DisconnectedNodeMutableRequestException if the request was a PUT, + * POST, DELETE and a node is disconnected from the cluster + * @throws SafeModeMutableRequestException if the request was a PUT, POST, + * DELETE and a the cluster is in safe mode + */ + NodeResponse applyRequest(String method, URI uri, Object entity, Map<String, String> headers) + throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException, + DisconnectedNodeMutableRequestException, SafeModeMutableRequestException; + + /** + * Federates the HTTP request to the nodes specified. The given URI's host + * and port will not be used and instead will be adjusted for each node's + * host and port. The node URIs are guaranteed to be constructed before + * issuing any requests, so if a UriConstructionException is thrown, then it + * is guaranteed that no request was issued. + * + * @param method the HTTP method (e.g., GET, POST, PUT, DELETE, HEAD) + * @param uri the base request URI (up to, but not including, the query + * string) + * @param entity the HTTP request entity + * @param headers the request headers + * @param nodeIdentifiers the NodeIdentifier for each node that the request + * should be replaced to + * + * @return the client response + * + * @throws NoConnectedNodesException if no nodes are connected as results of + * the request + * @throws NoResponseFromNodesException if no response could be obtained + * @throws UriConstructionException if there was an issue constructing the + * URIs tailored for each individual node + * @throws ConnectingNodeMutableRequestException if the request was a PUT, + * POST, DELETE and a node is connecting to the cluster + * @throws DisconnectedNodeMutableRequestException if the request was a PUT, + * POST, DELETE and a node is disconnected from the cluster + * @throws SafeModeMutableRequestException if the request was a PUT, POST, + * DELETE and a the cluster is in safe mode + */ + NodeResponse applyRequest(String method, URI uri, Object entity, Map<String, String> headers, Set<NodeIdentifier> nodeIdentifiers) + throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException, + DisconnectedNodeMutableRequestException, SafeModeMutableRequestException; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpRequestReplicator.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpRequestReplicator.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpRequestReplicator.java new file mode 100644 index 0000000..fb57622 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpRequestReplicator.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import org.apache.nifi.cluster.manager.exception.UriConstructionException; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.nifi.cluster.protocol.NodeIdentifier; + +/** + * A service for managing the replication of requests to nodes. It is up to the + * implementing class to decide if requests are sent concurrently or serially. + * + * Clients must call start() and stop() to initialize and shutdown the instance. + * The instance must be started before issuing any replication requests. + * + * @author unattributed + */ +public interface HttpRequestReplicator { + + /** + * Starts the instance for replicating requests. Start may only be called if + * the instance is not running. + */ + void start(); + + /** + * Stops the instance from replicating requests. Stop may only be called if + * the instance is running. + */ + void stop(); + + /** + * @return true if the instance is started; false otherwise. + */ + boolean isRunning(); + + /** + * Requests are sent to each node in the cluster. If the request results in + * an exception, then the NodeResourceResponse will contain the exception. + * + * HTTP DELETE and OPTIONS methods must supply an empty parameters map or + * else and IllegalArgumentException is thrown. + * + * @param nodeIds the node identifiers + * @param method the HTTP method (e.g., GET, POST, PUT, DELETE, HEAD, + * OPTIONS) + * @param uri the base request URI (up to, but not including, the query + * string) + * @param parameters any request parameters + * @param headers any HTTP headers + * + * @return the set of node responses + * + * @throws UriConstructionException if a request for a node failed to be + * constructed from the given prototype URI. If thrown, it is guaranteed + * that no request was sent. + */ + Set<NodeResponse> replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Map<String, List<String>> parameters, Map<String, String> headers) throws UriConstructionException; + + /** + * Requests are sent to each node in the cluster. If the request results in + * an exception, then the NodeResourceResponse will contain the exception. + * + * HTTP DELETE, GET, HEAD, and OPTIONS methods will throw an + * IllegalArgumentException if used. + * + * @param nodeIds the node identifiers + * @param method the HTTP method (e.g., POST, PUT) + * @param uri the base request URI (up to, but not including, the query + * string) + * @param entity an entity + * @param headers any HTTP headers + * + * @return the set of node responses + * + * @throws UriConstructionException if a request for a node failed to be + * constructed from the given prototype URI. If thrown, it is guaranteed + * that no request was sent. + */ + Set<NodeResponse> replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers) throws UriConstructionException; + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpResponseMapper.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpResponseMapper.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpResponseMapper.java new file mode 100644 index 0000000..843a666 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpResponseMapper.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import java.net.URI; +import java.util.Map; +import java.util.Set; +import org.apache.nifi.cluster.node.Node.Status; + +/** + * Maps a HTTP response to a node status. + * + * @author unattributed + */ +public interface HttpResponseMapper { + + /** + * Maps a HTTP response to a node response and the corresponding node + * status. + * + * @param requestURI the original request URI + * @param nodeResponses a set of node resource responses + * + * @return a map associating the node response to the node status + */ + Map<NodeResponse, Status> map(URI requestURI, Set<NodeResponse> nodeResponses); + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java new file mode 100644 index 0000000..3f966e5 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager; + +import com.sun.jersey.api.client.ClientResponse; +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import javax.ws.rs.HttpMethod; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.ResponseBuilder; +import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.StreamingOutput; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.entity.Entity; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Encapsulates a node's response in regards to receiving a external API + * request. + * + * Both the ClientResponse and (server) Response may be obtained from this + * instance. The ClientResponse is stored as it is received from the node. This + * includes the entity input stream. The Response is constructed on demand when + * mapping a ClientResponse to the Response. The ClientResponse to Response + * mapping includes copying the ClientResponse's input stream to the Response. + * Therefore, the getResponse() method should not be called more than once. + * Furthermore, the method should not be called if the caller has already read + * the ClientResponse's input stream. + * + * If a ClientResponse was unable to be created, then a NodeResponse will store + * the Throwable, which may be obtained by calling getThrowable(). + * + * This class overrides hashCode and equals and considers two instances to be + * equal if they have the equal NodeIdentifiers. + * + * @author unattributed + */ +public class NodeResponse { + + private static final Logger logger = LoggerFactory.getLogger(NodeResponse.class); + private final String httpMethod; + private final URI requestUri; + private final ClientResponse clientResponse; + private final NodeIdentifier nodeId; + private final Throwable throwable; + private boolean hasCreatedResponse = false; + private final Entity updatedEntity; + private final long requestDurationNanos; + private final String requestId; + + public NodeResponse(final NodeIdentifier nodeId, final String httpMethod, final URI requestUri, final ClientResponse clientResponse, final long requestDurationNanos, final String requestId) { + if (nodeId == null) { + throw new IllegalArgumentException("Node identifier may not be null."); + } else if (StringUtils.isBlank(httpMethod)) { + throw new IllegalArgumentException("Http method may not be null or empty."); + } else if (requestUri == null) { + throw new IllegalArgumentException("Request URI may not be null."); + } else if (clientResponse == null) { + throw new IllegalArgumentException("ClientResponse may not be null."); + } + this.nodeId = nodeId; + this.httpMethod = httpMethod; + this.requestUri = requestUri; + this.clientResponse = clientResponse; + this.throwable = null; + this.updatedEntity = null; + this.requestDurationNanos = requestDurationNanos; + this.requestId = requestId; + } + + public NodeResponse(final NodeIdentifier nodeId, final String httpMethod, final URI requestUri, final Throwable throwable) { + if (nodeId == null) { + throw new IllegalArgumentException("Node identifier may not be null."); + } else if (StringUtils.isBlank(httpMethod)) { + throw new IllegalArgumentException("Http method may not be null or empty."); + } else if (requestUri == null) { + throw new IllegalArgumentException("Request URI may not be null."); + } else if (throwable == null) { + throw new IllegalArgumentException("Throwable may not be null."); + } + this.nodeId = nodeId; + this.httpMethod = httpMethod; + this.requestUri = requestUri; + this.clientResponse = null; + this.throwable = throwable; + this.updatedEntity = null; + this.requestDurationNanos = -1L; + this.requestId = null; + } + + public NodeResponse(final NodeResponse example, final Entity updatedEntity) { + Objects.requireNonNull(example, "NodeResponse cannot be null"); + Objects.requireNonNull(updatedEntity, "UpdatedEntity cannot be null"); + + this.nodeId = example.nodeId; + this.httpMethod = example.httpMethod; + this.requestUri = example.requestUri; + this.clientResponse = example.clientResponse; + this.throwable = example.throwable; + this.updatedEntity = updatedEntity; + this.requestDurationNanos = example.requestDurationNanos; + this.requestId = null; + } + + public NodeIdentifier getNodeId() { + return nodeId; + } + + public String getHttpMethod() { + return httpMethod; + } + + public URI getRequestUri() { + return requestUri; + } + + /** + * @return the HTTP response status code + */ + public int getStatus() { + if (hasThrowable()) { + /* + * since there is a throwable, there is no client input stream to + * worry about maintaining, so we can call getResponse() method + */ + return getResponse().getStatus(); + } else { + /* + * use client response's status instead of calling getResponse().getStatus() + * so that we don't read the client's input stream as part of creating + * the response in the getResponse() method + */ + return clientResponse.getStatus(); + } + } + + /** + * Returns true if the response status is 2xx, false otherwise. + * + * @return + */ + public boolean is2xx() { + final int statusCode = getStatus(); + return (200 <= statusCode && statusCode <= 299); + } + + /** + * Returns true if the response status is 5xx, false otherwise. + * + * @return + */ + public boolean is5xx() { + final int statusCode = getStatus(); + return (500 <= statusCode && statusCode <= 599); + } + + /** + * Returns null if hasThrowable() is true; otherwise the client's response + * is returned. + * + * The ClientResponse's input stream can only be read once. + * + * @return the client's response + */ + public ClientResponse getClientResponse() { + return clientResponse; + } + + /** + * Creates a Response by mapping the ClientResponse values to it. Since the + * ClientResponse's input stream can only be read once, this method should + * only be called once. Furthermore, the caller should not have already read + * the ClientResponse's input stream. + * + * @return the response + */ + public Response getResponse() { + // if the response encapsulates a throwable, then the input stream is never read and the below warning is irrelevant + if (hasCreatedResponse && !hasThrowable()) { + logger.warn("ClientResponse's input stream has already been read. The created response will not contain this data."); + } + hasCreatedResponse = true; + return createResponse(); + } + + /** + * Returns the throwable or null if no throwable exists. + * + * @return the throwable or null if no throwable exists + */ + public Throwable getThrowable() { + return throwable; + } + + /** + * Returns true if a throwable was thrown and a response was not able to be + * created; false otherwise. + * + * @return true if a throwable was thrown and a response was not able to be + * created; false otherwise + */ + public boolean hasThrowable() { + return getThrowable() != null; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final NodeResponse other = (NodeResponse) obj; + if (this.nodeId != other.nodeId && (this.nodeId == null || !this.nodeId.equals(other.nodeId))) { + return false; + } + return true; + } + + @Override + public int hashCode() { + int hash = 7; + hash = 13 * hash + (this.nodeId != null ? this.nodeId.hashCode() : 0); + return hash; + } + + public long getRequestDuration(final TimeUnit timeUnit) { + return timeUnit.convert(requestDurationNanos, TimeUnit.NANOSECONDS); + } + + public String getRequestId() { + return requestId; + } + + private Response createResponse() { + + // if no client response was created, then generate a 500 response + if (hasThrowable()) { + return Response.status(Status.INTERNAL_SERVER_ERROR).build(); + } + + // set the status + final ResponseBuilder responseBuilder = Response.status(clientResponse.getStatus()); + + // set the headers + for (final String key : clientResponse.getHeaders().keySet()) { + final List<String> values = clientResponse.getHeaders().get(key); + for (final String value : values) { + + if (key.equalsIgnoreCase("transfer-encoding") || key.equalsIgnoreCase("content-length")) { + /* + * do not copy the transfer-encoding header (i.e., chunked encoding) or + * the content-length. Let the outgoing response builder determine it. + */ + continue; + } else if (key.equals("X-ClusterContext")) { + /* + * do not copy the cluster context to the response because + * this information is private and should not be sent to + * the client + */ + continue; + } + responseBuilder.header(key, value); + } + } + + // head requests must not have a message-body in the response + if (!HttpMethod.HEAD.equalsIgnoreCase(httpMethod)) { + + // set the entity + if (updatedEntity == null) { + responseBuilder.entity(new StreamingOutput() { + @Override + public void write(final OutputStream output) throws IOException, WebApplicationException { + BufferedInputStream bis = null; + try { + bis = new BufferedInputStream(clientResponse.getEntityInputStream()); + IOUtils.copy(bis, output); + } finally { + IOUtils.closeQuietly(bis); + } + } + }); + } else { + responseBuilder.entity(updatedEntity); + } + } + + return responseBuilder.build(); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(); + sb.append("NodeResponse[nodeUri=").append(nodeId.getApiAddress()).append(":").append(nodeId.getApiPort()).append(",") + .append("method=").append(httpMethod) + .append(",URI=").append(requestUri) + .append(",ResponseCode=").append(getStatus()) + .append(",Duration=").append(TimeUnit.MILLISECONDS.convert(requestDurationNanos, TimeUnit.NANOSECONDS)).append(" ms]"); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/BlockedByFirewallException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/BlockedByFirewallException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/BlockedByFirewallException.java new file mode 100644 index 0000000..49bcd35 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/BlockedByFirewallException.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.exception; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; + +/** + * + */ +public class BlockedByFirewallException extends ClusterException { + + private final NodeIdentifier nodeId; + private final boolean isExistingNode; + + public BlockedByFirewallException(NodeIdentifier nodeId, boolean isExistingNode, String msg, Throwable cause) { + super(msg, cause); + this.nodeId = nodeId; + this.isExistingNode = isExistingNode; + } + + public BlockedByFirewallException(NodeIdentifier nodeId, boolean isExistingNode, Throwable cause) { + super(cause); + this.nodeId = nodeId; + this.isExistingNode = isExistingNode; + } + + public BlockedByFirewallException(NodeIdentifier nodeId, boolean isExistingNode, String msg) { + super(msg); + this.nodeId = nodeId; + this.isExistingNode = isExistingNode; + } + + public BlockedByFirewallException(NodeIdentifier nodeId, boolean isExistingNode) { + this.nodeId = nodeId; + this.isExistingNode = isExistingNode; + } + + public NodeIdentifier getNodeId() { + return nodeId; + } + + public boolean isExistingNode() { + return isExistingNode; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ClusterException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ClusterException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ClusterException.java new file mode 100644 index 0000000..3bf9752 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ClusterException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.exception; + +/** + * The base exception class for cluster related exceptions. + * + * @author unattributed + */ +public class ClusterException extends RuntimeException { + + public ClusterException() { + } + + public ClusterException(String msg) { + super(msg); + } + + public ClusterException(Throwable cause) { + super(cause); + } + + public ClusterException(String msg, Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ConnectingNodeMutableRequestException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ConnectingNodeMutableRequestException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ConnectingNodeMutableRequestException.java new file mode 100644 index 0000000..365b5f0 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ConnectingNodeMutableRequestException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.exception; + +/** + * Represents the exceptional case when a HTTP request that may change a node's + * dataflow is to be replicated while a node is connecting to the cluster. + * + * @author unattributed + */ +public class ConnectingNodeMutableRequestException extends MutableRequestException { + + public ConnectingNodeMutableRequestException() { + } + + public ConnectingNodeMutableRequestException(String msg) { + super(msg); + } + + public ConnectingNodeMutableRequestException(Throwable cause) { + super(cause); + } + + public ConnectingNodeMutableRequestException(String msg, Throwable cause) { + super(msg, cause); + } +}