NIFI-271 checkpoint
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/9dda16c9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/9dda16c9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/9dda16c9 Branch: refs/heads/NIFI-271 Commit: 9dda16c995ba55b8025506a12b0844031f5efcf0 Parents: 888254b Author: joewitt <joew...@apache.org> Authored: Wed Apr 22 12:52:00 2015 -0400 Committer: joewitt <joew...@apache.org> Committed: Wed Apr 22 12:52:00 2015 -0400 ---------------------------------------------------------------------- .../nifi/cluster/context/ClusterContext.java | 36 +- .../cluster/context/ClusterContextImpl.java | 10 +- .../context/ClusterContextThreadLocal.java | 12 +- .../cluster/firewall/ClusterNodeFirewall.java | 5 +- .../impl/FileBasedClusterNodeFirewall.java | 8 +- .../nifi/cluster/flow/ClusterDataFlow.java | 8 +- .../apache/nifi/cluster/flow/DataFlowDao.java | 2 +- .../cluster/flow/DataFlowManagementService.java | 23 +- .../nifi/cluster/flow/impl/DataFlowDaoImpl.java | 52 +-- .../impl/DataFlowManagementServiceImpl.java | 38 +- .../nifi/cluster/manager/ClusterManager.java | 8 +- .../nifi/cluster/manager/NodeResponse.java | 18 +- .../cluster/manager/impl/WebClusterManager.java | 374 +++++++++---------- .../java/org/apache/nifi/cluster/node/Node.java | 2 +- ...anagerProtocolServiceLocatorFactoryBean.java | 2 +- .../spring/WebClusterManagerFactoryBean.java | 8 +- .../apache/nifi/web/ConfigurationRequest.java | 9 +- .../apache/nifi/web/ConfigurationSnapshot.java | 3 +- .../org/apache/nifi/web/FlowModification.java | 16 +- .../nifi/web/OptimisticLockingManager.java | 25 +- .../web/StandardOptimisticLockingManager.java | 32 +- .../org/apache/nifi/web/UpdateRevision.java | 6 +- .../org/apache/nifi/web/security/DnUtils.java | 14 +- .../anonymous/NiFiAnonymousUserFilter.java | 4 +- .../NiFiAuthenticationEntryPoint.java | 11 +- .../authorization/NiFiAuthorizationService.java | 38 +- .../nifi/web/security/user/NiFiUserDetails.java | 16 +- .../nifi/web/security/user/NiFiUserUtils.java | 6 +- .../x509/SubjectDnX509PrincipalExtractor.java | 6 - .../security/x509/X509AuthenticationFilter.java | 50 +-- .../security/x509/X509CertificateExtractor.java | 4 +- .../x509/ocsp/OcspCertificateValidator.java | 43 +-- .../NiFiAuthorizationServiceTest.java | 91 ++--- 33 files changed, 469 insertions(+), 511 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java index 44fb25a..8c3e41b 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java @@ -22,38 +22,44 @@ import org.apache.nifi.action.Action; import org.apache.nifi.web.Revision; /** - * Contains contextual information about clustering that may be serialized + * Contains contextual information about clustering that may be serialized * between manager and node when communicating over HTTP. */ public interface ClusterContext extends Serializable { - + /** - * Returns a list of auditable actions. The list is modifiable - * and will never be null. + * Returns a list of auditable actions. The list is modifiable and will + * never be null. + * * @return a collection of actions */ List<Action> getActions(); - + Revision getRevision(); - + void setRevision(Revision revision); - + /** - * @return true if the request was sent by the cluster manager; false otherwise + * @return true if the request was sent by the cluster manager; false + * otherwise */ boolean isRequestSentByClusterManager(); - + /** * Sets the flag to indicate if a request was sent by the cluster manager. - * @param flag true if the request was sent by the cluster manager; false otherwise + * + * @param flag true if the request was sent by the cluster manager; false + * otherwise */ void setRequestSentByClusterManager(boolean flag); - + /** - * Gets an id generation seed. This is used to ensure that nodes are able to generate the - * same id across the cluster. This is usually handled by the cluster manager creating the - * id, however for some actions (snippets, templates, etc) this is not possible. - * @return + * Gets an id generation seed. This is used to ensure that nodes are able to + * generate the same id across the cluster. This is usually handled by the + * cluster manager creating the id, however for some actions (snippets, + * templates, etc) this is not possible. + * + * @return generated id seed */ String getIdGenerationSeed(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java index 06907d2..43e7c2d 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java @@ -29,13 +29,13 @@ import org.apache.nifi.web.Revision; public class ClusterContextImpl implements ClusterContext, Serializable { private final List<Action> actions = new ArrayList<>(); - + private Revision revision; - + private boolean requestSentByClusterManager; - + private final String idGenerationSeed = UUID.randomUUID().toString(); - + @Override public List<Action> getActions() { return actions; @@ -55,7 +55,7 @@ public class ClusterContextImpl implements ClusterContext, Serializable { public boolean isRequestSentByClusterManager() { return requestSentByClusterManager; } - + @Override public void setRequestSentByClusterManager(boolean requestSentByClusterManager) { this.requestSentByClusterManager = requestSentByClusterManager; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java index c8c7206..79900fb 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java @@ -20,23 +20,23 @@ package org.apache.nifi.cluster.context; * Manages a cluster context on a threadlocal. */ public class ClusterContextThreadLocal { - + private static final ThreadLocal<ClusterContext> contextHolder = new ThreadLocal<>(); - + public static void removeContext() { contextHolder.remove(); } - + public static ClusterContext createEmptyContext() { return new ClusterContextImpl(); } - + public static ClusterContext getContext() { return contextHolder.get(); } - + public static void setContext(final ClusterContext context) { contextHolder.set(context); } - + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java index 2e3d278..08d21a5 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java @@ -27,8 +27,9 @@ public interface ClusterNodeFirewall { * false otherwise. * * If an IP is given, then it must be formatted in dotted decimal notation. - * @param hostOrIp - * @return + * + * @param hostOrIp host + * @return true if permissible */ boolean isPermissible(String hostOrIp); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java index 916ec14..5219629 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java @@ -16,10 +16,14 @@ */ package org.apache.nifi.cluster.firewall.impl; -import java.io.*; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; import org.apache.commons.net.util.SubnetUtils; import org.apache.nifi.cluster.firewall.ClusterNodeFirewall; import org.apache.nifi.util.file.FileUtils; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java index c17b429..2803d4c 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java @@ -39,13 +39,13 @@ public class ClusterDataFlow { } public byte[] getControllerServices() { - return controllerServices; + return controllerServices; } - + public byte[] getReportingTasks() { - return reportingTasks; + return reportingTasks; } - + public NodeIdentifier getPrimaryNodeId() { return primaryNodeId; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java index a273704..9ee5aa8 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java @@ -36,7 +36,7 @@ public interface DataFlowDao { * Saves the cluster's dataflow. * * - * @param dataFlow + * @param dataFlow flow * @throws DaoException if the dataflow was unable to be saved */ void saveDataFlow(ClusterDataFlow dataFlow) throws DaoException; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java index 082d65e..f354507 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java @@ -31,7 +31,6 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier; * * Clients must call start() and stop() to initialize and stop the instance. * - * @author unattributed */ public interface DataFlowManagementService { @@ -68,21 +67,23 @@ public interface DataFlowManagementService { void updatePrimaryNode(NodeIdentifier nodeId) throws DaoException; /** - * Updates the dataflow with the given serialized form of the Controller Services that are to exist on the NCM. - * - * @param serializedControllerServices - * @throws DaoException + * Updates the dataflow with the given serialized form of the Controller + * Services that are to exist on the NCM. + * + * @param serializedControllerServices services + * @throws DaoException ex */ void updateControllerServices(byte[] serializedControllerServices) throws DaoException; - + /** - * Updates the dataflow with the given serialized form of Reporting Tasks that are to exist on the NCM. - * - * @param serviceNodes - * @throws DaoException + * Updates the dataflow with the given serialized form of Reporting Tasks + * that are to exist on the NCM. + * + * @param serializedReportingTasks tasks + * @throws DaoException ex */ void updateReportingTasks(byte[] serializedReportingTasks) throws DaoException; - + /** * Sets the state of the flow. * http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java index dd9d2a3..e2690f7 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java @@ -187,36 +187,35 @@ public class DataFlowDaoImpl implements DataFlowDao { throw new DaoException(ex); } } - - + private void syncWithRestore(final File primaryFile, final File restoreFile) throws IOException { try (final FileInputStream primaryFis = new FileInputStream(primaryFile); - final TarArchiveInputStream primaryIn = new TarArchiveInputStream(primaryFis); - final FileInputStream restoreFis = new FileInputStream(restoreFile); - final TarArchiveInputStream restoreIn = new TarArchiveInputStream(restoreFis)) { - + final TarArchiveInputStream primaryIn = new TarArchiveInputStream(primaryFis); + final FileInputStream restoreFis = new FileInputStream(restoreFile); + final TarArchiveInputStream restoreIn = new TarArchiveInputStream(restoreFis)) { + final ArchiveEntry primaryEntry = primaryIn.getNextEntry(); final ArchiveEntry restoreEntry = restoreIn.getNextEntry(); - if ( primaryEntry == null && restoreEntry == null ) { + if (primaryEntry == null && restoreEntry == null) { return; } - if ( (primaryEntry == null && restoreEntry != null) || (primaryEntry != null && restoreEntry == null) ) { + if ((primaryEntry == null && restoreEntry != null) || (primaryEntry != null && restoreEntry == null)) { throw new IllegalStateException(String.format("Primary file '%s' is different than restore file '%s'", primaryFile.getAbsoluteFile(), restoreFile.getAbsolutePath())); } - + final byte[] primaryMd5 = calculateMd5(primaryIn); final byte[] restoreMd5 = calculateMd5(restoreIn); - - if ( !Arrays.equals(primaryMd5, restoreMd5) ) { + + if (!Arrays.equals(primaryMd5, restoreMd5)) { throw new IllegalStateException(String.format("Primary file '%s' is different than restore file '%s'", primaryFile.getAbsoluteFile(), restoreFile.getAbsolutePath())); } } } - + private byte[] calculateMd5(final InputStream in) throws IOException { final MessageDigest digest; try { @@ -224,7 +223,7 @@ public class DataFlowDaoImpl implements DataFlowDao { } catch (final NoSuchAlgorithmException nsae) { throw new IOException(nsae); } - + int len; final byte[] buffer = new byte[8192]; while ((len = in.read(buffer)) > -1) { @@ -257,12 +256,14 @@ public class DataFlowDaoImpl implements DataFlowDao { if (primaryStateFile == null) { writeDataFlow(createNewFlowStateFile(restoreDirectory), dataFlow); } else { - throw new DaoException(String.format("Unable to save dataflow because dataflow state file in primary directory '%s' exists, but it does not exist in the restore directory '%s'", + throw new DaoException(String.format("Unable to save dataflow because dataflow state file in primary directory " + + "'%s' exists, but it does not exist in the restore directory '%s'", primaryDirectory.getAbsolutePath(), restoreDirectory.getAbsolutePath())); } } else { if (primaryStateFile == null) { - throw new DaoException(String.format("Unable to save dataflow because dataflow state file in restore directory '%s' exists, but it does not exist in the primary directory '%s'", + throw new DaoException(String.format("Unable to save dataflow because dataflow state file in restore directory " + + "'%s' exists, but it does not exist in the primary directory '%s'", restoreDirectory.getAbsolutePath(), primaryDirectory.getAbsolutePath())); } else { final PersistedFlowState primaryFlowState = getPersistedFlowState(primaryStateFile); @@ -270,14 +271,15 @@ public class DataFlowDaoImpl implements DataFlowDao { if (primaryFlowState == restoreFlowState) { writeDataFlow(restoreStateFile, dataFlow); } else { - throw new DaoException(String.format("Unable to save dataflow because state file in primary directory '%s' has state '%s', but the state file in the restore directory '%s' has state '%s'", + throw new DaoException(String.format("Unable to save dataflow because state file in primary directory " + + "'%s' has state '%s', but the state file in the restore directory '%s' has state '%s'", primaryDirectory.getAbsolutePath(), primaryFlowState, restoreDirectory.getAbsolutePath(), restoreFlowState)); } } } } - // write dataflow to primary + // write dataflow to primary if (primaryStateFile == null) { writeDataFlow(createNewFlowStateFile(primaryDirectory), dataFlow); } else { @@ -477,7 +479,7 @@ public class DataFlowDaoImpl implements DataFlowDao { byte[] clusterInfoBytes = new byte[0]; byte[] controllerServiceBytes = new byte[0]; byte[] reportingTaskBytes = new byte[0]; - + try (final InputStream inStream = new FileInputStream(file); final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(inStream))) { TarArchiveEntry tarEntry; @@ -500,13 +502,13 @@ public class DataFlowDaoImpl implements DataFlowDao { StreamUtils.fillBuffer(tarIn, clusterInfoBytes, true); break; case CONTROLLER_SERVICES_FILENAME: - controllerServiceBytes = new byte[(int) tarEntry.getSize()]; - StreamUtils.fillBuffer(tarIn, controllerServiceBytes, true); - break; + controllerServiceBytes = new byte[(int) tarEntry.getSize()]; + StreamUtils.fillBuffer(tarIn, controllerServiceBytes, true); + break; case REPORTING_TASKS_FILENAME: - reportingTaskBytes = new byte[(int) tarEntry.getSize()]; - StreamUtils.fillBuffer(tarIn, reportingTaskBytes, true); - break; + reportingTaskBytes = new byte[(int) tarEntry.getSize()]; + StreamUtils.fillBuffer(tarIn, reportingTaskBytes, true); + break; default: throw new DaoException("Found Unexpected file in dataflow configuration: " + tarEntry.getName()); } @@ -559,7 +561,7 @@ public class DataFlowDaoImpl implements DataFlowDao { final TarArchiveOutputStream tarOut = new TarArchiveOutputStream(new BufferedOutputStream(fos))) { final DataFlow dataFlow = clusterDataFlow.getDataFlow(); - if ( dataFlow == null ) { + if (dataFlow == null) { writeTarEntry(tarOut, FLOW_XML_FILENAME, getEmptyFlowBytes()); writeTarEntry(tarOut, TEMPLATES_FILENAME, new byte[0]); writeTarEntry(tarOut, SNIPPETS_FILENAME, new byte[0]); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java index 1bb8ca3..4fa6504 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java @@ -64,12 +64,11 @@ import org.slf4j.LoggerFactory; public class DataFlowManagementServiceImpl implements DataFlowManagementService { /* - * Developer Note: - * + * Developer Note: + * * This class maintains an ExecutorService and a Runnable. * Although the class is not externally threadsafe, its internals are protected to * accommodate multithread access between the ExecutorServer and the Runnable. - * */ private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(DataFlowManagementServiceImpl.class)); @@ -170,13 +169,12 @@ public class DataFlowManagementServiceImpl implements DataFlowManagementService resourceLock.unlock("updatePrimaryNode"); } } - - + @Override public void updateControllerServices(final byte[] controllerServiceBytes) throws DaoException { - resourceLock.lock(); - try { - final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow(); + resourceLock.lock(); + try { + final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow(); final StandardDataFlow dataFlow; final byte[] reportingTaskBytes; @@ -192,16 +190,16 @@ public class DataFlowManagementServiceImpl implements DataFlowManagementService } flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId, controllerServiceBytes, reportingTaskBytes)); - } finally { - resourceLock.unlock("updateControllerServices"); - } + } finally { + resourceLock.unlock("updateControllerServices"); + } } - + @Override public void updateReportingTasks(final byte[] reportingTaskBytes) throws DaoException { - resourceLock.lock(); - try { - final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow(); + resourceLock.lock(); + try { + final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow(); final StandardDataFlow dataFlow; final byte[] controllerServiceBytes; @@ -217,9 +215,9 @@ public class DataFlowManagementServiceImpl implements DataFlowManagementService } flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId, controllerServiceBytes, reportingTaskBytes)); - } finally { - resourceLock.unlock("updateControllerServices"); - } + } finally { + resourceLock.unlock("updateControllerServices"); + } } @Override @@ -361,8 +359,8 @@ public class DataFlowManagementServiceImpl implements DataFlowManagementService if (existingClusterDataFlow == null) { currentClusterDataFlow = new ClusterDataFlow(dataFlow, null, new byte[0], new byte[0]); } else { - currentClusterDataFlow = new ClusterDataFlow(dataFlow, existingClusterDataFlow.getPrimaryNodeId(), - existingClusterDataFlow.getControllerServices(), existingClusterDataFlow.getReportingTasks()); + currentClusterDataFlow = new ClusterDataFlow(dataFlow, existingClusterDataFlow.getPrimaryNodeId(), + existingClusterDataFlow.getControllerServices(), existingClusterDataFlow.getReportingTasks()); } flowDao.saveDataFlow(currentClusterDataFlow); flowDao.setPersistedFlowState(PersistedFlowState.CURRENT); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java index 3a1dfb2..be52e0f 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java @@ -77,14 +77,14 @@ public interface ClusterManager extends NodeInformant { Set<Node> getNodes(Status... statuses); /** - * @param nodeId + * @param nodeId node identifier * @return returns the node with the given identifier or null if node does * not exist */ Node getNode(String nodeId); /** - * @param statuses + * @param statuses statuses * @return the set of node identifiers with the given node status */ Set<NodeIdentifier> getNodeIds(Status... statuses); @@ -199,9 +199,7 @@ public interface ClusterManager extends NodeInformant { Node getPrimaryNode(); /** - * Returns the bulletin repository. - * - * @return + * @return the bulletin repository */ BulletinRepository getBulletinRepository(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java index 8bc73ab..958d600 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java @@ -192,19 +192,19 @@ public class NodeResponse { } /** - * If this node response has been merged returns the updated entity, - * otherwise null. Also returns null if hasThrowable() is true. The - * intent of this method is to support getting the response entity - * when it was already consumed during the merge operation. In this - * case the client response rom getClientResponse() will not support - * a getEntity(...) or getEntityInputStream() call. - * - * @return + * If this node response has been merged returns the updated entity, + * otherwise null. Also returns null if hasThrowable() is true. The intent + * of this method is to support getting the response entity when it was + * already consumed during the merge operation. In this case the client + * response rom getClientResponse() will not support a getEntity(...) or + * getEntityInputStream() call. + * + * @return */ public Entity getUpdatedEntity() { return updatedEntity; } - + /** * Creates a Response by mapping the ClientResponse values to it. Since the * ClientResponse's input stream can only be read once, this method should http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index db6421e..94ea17f 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -318,13 +318,13 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C public static final String PROVENANCE_URI = "/nifi-api/controller/provenance"; public static final Pattern PROVENANCE_QUERY_URI = Pattern.compile("/nifi-api/controller/provenance/[a-f0-9\\-]{36}"); public static final Pattern PROVENANCE_EVENT_URI = Pattern.compile("/nifi-api/controller/provenance/events/[0-9]+"); - + public static final String CONTROLLER_SERVICES_URI = "/nifi-api/controller/controller-services/node"; public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}"); public static final Pattern CONTROLLER_SERVICE_REFERENCES_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}/references"); public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node"; public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}"); - + private final NiFiProperties properties; private final HttpRequestReplicator httpRequestReplicator; private final HttpResponseMapper httpResponseMapper; @@ -427,14 +427,14 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C public void heartbeat() { } }, this, encryptor); - + // When we construct the scheduling agents, we can pass null for a lot of the arguments because we are only // going to be scheduling Reporting Tasks. Otherwise, it would not be okay. processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, new TimerDrivenSchedulingAgent(null, reportingTaskEngine, null, encryptor)); processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, new QuartzSchedulingAgent(null, reportingTaskEngine, null, encryptor)); processScheduler.setMaxThreadCount(SchedulingStrategy.TIMER_DRIVEN, 10); processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10); - + controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository); } @@ -479,10 +479,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } final byte[] serializedServices = clusterDataFlow.getControllerServices(); - if ( serializedServices != null && serializedServices.length > 0 ) { - ControllerServiceLoader.loadControllerServices(this, new ByteArrayInputStream(serializedServices), encryptor, bulletinRepository, properties.getAutoResumeState()); + if (serializedServices != null && serializedServices.length > 0) { + ControllerServiceLoader.loadControllerServices(this, new ByteArrayInputStream(serializedServices), encryptor, bulletinRepository, properties.getAutoResumeState()); } - + // start multicast broadcasting service, if configured if (servicesBroadcaster != null) { servicesBroadcaster.start(); @@ -493,8 +493,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C // Load and start running Reporting Tasks final byte[] serializedReportingTasks = clusterDataFlow.getReportingTasks(); - if ( serializedReportingTasks != null && serializedReportingTasks.length > 0 ) { - loadReportingTasks(serializedReportingTasks); + if (serializedReportingTasks != null && serializedReportingTasks.length > 0) { + loadReportingTasks(serializedReportingTasks); } } catch (final IOException ioe) { logger.warn("Failed to initialize cluster services due to: " + ioe, ioe); @@ -558,10 +558,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C servicesBroadcaster.stop(); } - if ( processScheduler != null ) { + if (processScheduler != null) { processScheduler.shutdown(); } - + if (encounteredException) { throw new IOException("Failed to shutdown Cluster Manager because one or more cluster services failed to shutdown. Check the logs for details."); } @@ -946,7 +946,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C final String scheduleStateValue = DomUtils.getChild(taskElement, "scheduledState").getTextContent().trim(); final ScheduledState scheduledState = ScheduledState.valueOf(scheduleStateValue); - + // Reporting Task Properties for (final Element property : DomUtils.getChildElementsByTagName(taskElement, "property")) { final String name = DomUtils.getChildText(property, "name"); @@ -969,21 +969,21 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C final ReportingTask reportingTask = reportingTaskNode.getReportingTask(); final ComponentLog componentLog = new SimpleProcessLogger(taskId, reportingTask); - final ReportingInitializationContext config = new StandardReportingInitializationContext(taskId, taskName, + final ReportingInitializationContext config = new StandardReportingInitializationContext(taskId, taskName, schedulingStrategy, taskSchedulingPeriod, componentLog, this); reportingTask.initialize(config); final String annotationData = DomUtils.getChildText(taskElement, "annotationData"); - if ( annotationData != null ) { + if (annotationData != null) { reportingTaskNode.setAnnotationData(annotationData.trim()); } - + final Map<PropertyDescriptor, String> resolvedProps; try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { resolvedProps = new HashMap<>(); for (final Map.Entry<String, String> entry : properties.entrySet()) { final PropertyDescriptor descriptor = reportingTask.getPropertyDescriptor(entry.getKey()); - if ( entry.getValue() == null ) { + if (entry.getValue() == null) { resolvedProps.put(descriptor, descriptor.getDefaultValue()); } else { resolvedProps.put(descriptor, entry.getValue()); @@ -992,24 +992,24 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } for (final Map.Entry<PropertyDescriptor, String> entry : resolvedProps.entrySet()) { - if ( entry.getValue() != null ) { + if (entry.getValue() != null) { reportingTaskNode.setProperty(entry.getKey().getName(), entry.getValue()); } } - + final String comments = DomUtils.getChildText(taskElement, "comment"); - if ( comments != null ) { + if (comments != null) { reportingTaskNode.setComments(comments); } reportingTaskNode.setScheduledState(scheduledState); - if ( ScheduledState.RUNNING.equals(scheduledState) ) { - if ( reportingTaskNode.isValid() ) { + if (ScheduledState.RUNNING.equals(scheduledState)) { + if (reportingTaskNode.isValid()) { try { processScheduler.schedule(reportingTaskNode); } catch (final Exception e) { logger.error("Failed to start {} due to {}", reportingTaskNode, e); - if ( logger.isDebugEnabled() ) { + if (logger.isDebugEnabled()) { logger.error("", e); } } @@ -1017,8 +1017,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C logger.error("Failed to start {} because it is invalid due to {}", reportingTaskNode, reportingTaskNode.getValidationErrors()); } } - - + tasks.put(reportingTaskNode.getIdentifier(), reportingTaskNode); } } catch (final SAXException | ParserConfigurationException | IOException | DOMException | NumberFormatException | InitializationException t) { @@ -1031,7 +1030,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return tasks; } - @Override public ReportingTaskNode createReportingTask(final String type, final String id, final boolean firstTimeAdded) throws ReportingTaskInstantiationException { if (type == null) { @@ -1064,16 +1062,16 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C final ReportingTaskNode taskNode = new ClusteredReportingTaskNode(task, id, processScheduler, new ClusteredEventAccess(this), bulletinRepository, controllerServiceProvider, validationContextFactory); taskNode.setName(task.getClass().getSimpleName()); - + reportingTasks.put(id, taskNode); - if ( firstTimeAdded ) { + if (firstTimeAdded) { try (final NarCloseable x = NarCloseable.withNarLoader()) { ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, task); } catch (final Exception e) { throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + task, e); } } - + return taskNode; } @@ -1372,7 +1370,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C writeLock.unlock("handleControllerStartupFailure"); } } - + /** * Adds an instance of a specified controller service. * @@ -1383,7 +1381,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C */ @Override public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) { - return controllerServiceProvider.createControllerService(type, id, firstTimeAdded); + return controllerServiceProvider.createControllerService(type, id, firstTimeAdded); } @Override @@ -1410,82 +1408,80 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C public boolean isControllerServiceEnabling(final String serviceIdentifier) { return controllerServiceProvider.isControllerServiceEnabling(serviceIdentifier); } - + @Override public String getControllerServiceName(final String serviceIdentifier) { - return controllerServiceProvider.getControllerServiceName(serviceIdentifier); + return controllerServiceProvider.getControllerServiceName(serviceIdentifier); } @Override public void removeControllerService(final ControllerServiceNode serviceNode) { controllerServiceProvider.removeControllerService(serviceNode); } - @Override public void enableControllerService(final ControllerServiceNode serviceNode) { controllerServiceProvider.enableControllerService(serviceNode); } - + @Override public void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes) { controllerServiceProvider.enableControllerServices(serviceNodes); } - + @Override public void disableControllerService(final ControllerServiceNode serviceNode) { controllerServiceProvider.disableControllerService(serviceNode); } - + @Override public Set<ControllerServiceNode> getAllControllerServices() { - return controllerServiceProvider.getAllControllerServices(); + return controllerServiceProvider.getAllControllerServices(); } - - + @Override public void disableReferencingServices(final ControllerServiceNode serviceNode) { controllerServiceProvider.disableReferencingServices(serviceNode); } - + @Override public void enableReferencingServices(final ControllerServiceNode serviceNode) { controllerServiceProvider.enableReferencingServices(serviceNode); } - + @Override public void scheduleReferencingComponents(final ControllerServiceNode serviceNode) { controllerServiceProvider.scheduleReferencingComponents(serviceNode); } - + @Override public void unscheduleReferencingComponents(final ControllerServiceNode serviceNode) { controllerServiceProvider.unscheduleReferencingComponents(serviceNode); } - + @Override public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) { controllerServiceProvider.verifyCanEnableReferencingServices(serviceNode); } - + @Override public void verifyCanScheduleReferencingComponents(final ControllerServiceNode serviceNode) { controllerServiceProvider.verifyCanScheduleReferencingComponents(serviceNode); } - + @Override public void verifyCanDisableReferencingServices(final ControllerServiceNode serviceNode) { controllerServiceProvider.verifyCanDisableReferencingServices(serviceNode); } - + @Override public void verifyCanStopReferencingComponents(final ControllerServiceNode serviceNode) { controllerServiceProvider.verifyCanStopReferencingComponents(serviceNode); } - + private byte[] serialize(final Document doc) throws TransformerException { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final DOMSource domSource = new DOMSource(doc); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final DOMSource domSource = new DOMSource(doc); final StreamResult streamResult = new StreamResult(baos); // configure the transformer and convert the DOM @@ -1498,91 +1494,89 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C transformer.transform(domSource, streamResult); return baos.toByteArray(); } - + private byte[] serializeControllerServices() throws ParserConfigurationException, TransformerException { - final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); + final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); final DocumentBuilder docBuilder = docFactory.newDocumentBuilder(); final Document document = docBuilder.newDocument(); - final Element rootElement = document.createElement("controllerServices"); - document.appendChild(rootElement); - - for ( final ControllerServiceNode serviceNode : getAllControllerServices() ) { - StandardFlowSerializer.addControllerService(rootElement, serviceNode, encryptor); - } - - return serialize(document); - } - + final Element rootElement = document.createElement("controllerServices"); + document.appendChild(rootElement); + + for (final ControllerServiceNode serviceNode : getAllControllerServices()) { + StandardFlowSerializer.addControllerService(rootElement, serviceNode, encryptor); + } + + return serialize(document); + } + private byte[] serializeReportingTasks() throws ParserConfigurationException, TransformerException { - final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); + final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); final DocumentBuilder docBuilder = docFactory.newDocumentBuilder(); final Document document = docBuilder.newDocument(); - final Element rootElement = document.createElement("reportingTasks"); - document.appendChild(rootElement); - - for ( final ReportingTaskNode taskNode : getAllReportingTasks() ) { - StandardFlowSerializer.addReportingTask(rootElement, taskNode, encryptor); - } - - return serialize(document); - } - - + final Element rootElement = document.createElement("reportingTasks"); + document.appendChild(rootElement); + + for (final ReportingTaskNode taskNode : getAllReportingTasks()) { + StandardFlowSerializer.addReportingTask(rootElement, taskNode, encryptor); + } + + return serialize(document); + } + public void saveControllerServices() { - try { - dataFlowManagementService.updateControllerServices(serializeControllerServices()); - } catch (final Exception e) { - logger.error("Failed to save changes to NCM's Controller Services; changes may be lost on restart due to " + e); - if ( logger.isDebugEnabled() ) { - logger.error("", e); - } - - getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Controller Services", Severity.ERROR.name(), - "Failed to save changes to NCM's Controller Services; changes may be lost on restart. See logs for more details.")); - } - } - + try { + dataFlowManagementService.updateControllerServices(serializeControllerServices()); + } catch (final Exception e) { + logger.error("Failed to save changes to NCM's Controller Services; changes may be lost on restart due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + + getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Controller Services", Severity.ERROR.name(), + "Failed to save changes to NCM's Controller Services; changes may be lost on restart. See logs for more details.")); + } + } + public void saveReportingTasks() { - try { - dataFlowManagementService.updateReportingTasks(serializeReportingTasks()); - } catch (final Exception e) { - logger.error("Failed to save changes to NCM's Reporting Tasks; changes may be lost on restart due to " + e); - if ( logger.isDebugEnabled() ) { - logger.error("", e); - } - - getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Reporting Tasks", Severity.ERROR.name(), - "Failed to save changes to NCM's Reporting Tasks; changes may be lost on restart. See logs for more details.")); - } + try { + dataFlowManagementService.updateReportingTasks(serializeReportingTasks()); + } catch (final Exception e) { + logger.error("Failed to save changes to NCM's Reporting Tasks; changes may be lost on restart due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + + getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Reporting Tasks", Severity.ERROR.name(), + "Failed to save changes to NCM's Reporting Tasks; changes may be lost on restart. See logs for more details.")); + } } @Override public Set<ReportingTaskNode> getAllReportingTasks() { - readLock.lock(); - try { - return new HashSet<>(reportingTasks.values()); - } finally { - readLock.unlock("getReportingTasks"); - } + readLock.lock(); + try { + return new HashSet<>(reportingTasks.values()); + } finally { + readLock.unlock("getReportingTasks"); + } } @Override public ReportingTaskNode getReportingTaskNode(final String taskId) { - readLock.lock(); - try { - return reportingTasks.get(taskId); - } finally { - readLock.unlock("getReportingTaskNode"); - } + readLock.lock(); + try { + return reportingTasks.get(taskId); + } finally { + readLock.unlock("getReportingTaskNode"); + } } @Override public void startReportingTask(final ReportingTaskNode reportingTaskNode) { reportingTaskNode.verifyCanStart(); - processScheduler.schedule(reportingTaskNode); + processScheduler.schedule(reportingTaskNode); } - @Override public void stopReportingTask(final ReportingTaskNode reportingTaskNode) { reportingTaskNode.verifyCanStop(); @@ -1591,52 +1585,50 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C @Override public void removeReportingTask(final ReportingTaskNode reportingTaskNode) { - writeLock.lock(); - try { - final ReportingTaskNode existing = reportingTasks.get(reportingTaskNode.getIdentifier()); - if ( existing == null || existing != reportingTaskNode ) { - throw new IllegalStateException("Reporting Task " + reportingTaskNode + " does not exist in this Flow"); - } - - reportingTaskNode.verifyCanDelete(); - - try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, reportingTaskNode.getReportingTask(), reportingTaskNode.getConfigurationContext()); - } - - for ( final Map.Entry<PropertyDescriptor, String> entry : reportingTaskNode.getProperties().entrySet() ) { - final PropertyDescriptor descriptor = entry.getKey(); - if (descriptor.getControllerServiceDefinition() != null ) { - final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue(); - if ( value != null ) { - final ControllerServiceNode serviceNode = controllerServiceProvider.getControllerServiceNode(value); - if ( serviceNode != null ) { - serviceNode.removeReference(reportingTaskNode); - } - } - } - } - - reportingTasks.remove(reportingTaskNode.getIdentifier()); - } finally { - writeLock.unlock("removeReportingTask"); - } - } - - + writeLock.lock(); + try { + final ReportingTaskNode existing = reportingTasks.get(reportingTaskNode.getIdentifier()); + if (existing == null || existing != reportingTaskNode) { + throw new IllegalStateException("Reporting Task " + reportingTaskNode + " does not exist in this Flow"); + } + + reportingTaskNode.verifyCanDelete(); + + try (final NarCloseable x = NarCloseable.withNarLoader()) { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, reportingTaskNode.getReportingTask(), reportingTaskNode.getConfigurationContext()); + } + + for (final Map.Entry<PropertyDescriptor, String> entry : reportingTaskNode.getProperties().entrySet()) { + final PropertyDescriptor descriptor = entry.getKey(); + if (descriptor.getControllerServiceDefinition() != null) { + final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue(); + if (value != null) { + final ControllerServiceNode serviceNode = controllerServiceProvider.getControllerServiceNode(value); + if (serviceNode != null) { + serviceNode.removeReference(reportingTaskNode); + } + } + } + } + + reportingTasks.remove(reportingTaskNode.getIdentifier()); + } finally { + writeLock.unlock("removeReportingTask"); + } + } + @Override public void disableReportingTask(final ReportingTaskNode reportingTask) { reportingTask.verifyCanDisable(); processScheduler.disableReportingTask(reportingTask); } - + @Override public void enableReportingTask(final ReportingTaskNode reportingTask) { reportingTask.verifyCanEnable(); processScheduler.enableReportingTask(reportingTask); } - - + /** * Handle a bulletins message. * @@ -2336,7 +2328,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C // merge the response final NodeResponse clientResponse = mergeResponses(uri, method, nodeResponses, mutableRequest); holder.set(clientResponse); - + // if we have a response get the updated cluster context for auditing and revision updating Revision updatedRevision = null; if (mutableRequest && clientResponse != null) { @@ -2367,18 +2359,18 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C logger.warn("Classpath issue detected because failed to deserialize cluster context from node response due to: " + cnfe, cnfe); } } - + return updatedRevision; } }; - + // federate the request and lock on the revision if (mutableRequest) { optimisticLockingManager.setRevision(federateRequest); } else { federateRequest.execute(optimisticLockingManager.getLastModification().getRevision()); } - + return holder.get(); } @@ -2387,7 +2379,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } private static boolean isProcessorEndpoint(final URI uri, final String method) { - if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && (PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches() || CLUSTER_PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches()) ) { + if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && (PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches() || CLUSTER_PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches())) { return true; } else if ("POST".equalsIgnoreCase(method) && PROCESSORS_URI_PATTERN.matcher(uri.getPath()).matches()) { return true; @@ -2434,11 +2426,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C private static boolean isProvenanceEventEndpoint(final URI uri, final String method) { return "GET".equalsIgnoreCase(method) && PROVENANCE_EVENT_URI.matcher(uri.getPath()).matches(); } - + private static boolean isControllerServicesEndpoint(final URI uri, final String method) { return "GET".equalsIgnoreCase(method) && CONTROLLER_SERVICES_URI.equals(uri.getPath()); } - + private static boolean isControllerServiceEndpoint(final URI uri, final String method) { if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_URI_PATTERN.matcher(uri.getPath()).matches()) { return true; @@ -2448,19 +2440,19 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return false; } - + private static boolean isControllerServiceReferenceEndpoint(final URI uri, final String method) { if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_REFERENCES_URI_PATTERN.matcher(uri.getPath()).matches()) { return true; } - + return false; } - + private static boolean isReportingTasksEndpoint(final URI uri, final String method) { return "GET".equalsIgnoreCase(method) && REPORTING_TASKS_URI.equals(uri.getPath()); } - + private static boolean isReportingTaskEndpoint(final URI uri, final String method) { if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && REPORTING_TASK_URI_PATTERN.matcher(uri.getPath()).matches()) { return true; @@ -2661,7 +2653,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C remoteProcessGroup.setAuthorizationIssues(mergedAuthorizationIssues); } } - + private void mergeControllerServiceReferences(final Set<ControllerServiceReferencingComponentDTO> referencingComponents, final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> referencingComponentMap) { final Map<String, Integer> activeThreadCounts = new HashMap<>(); final Map<String, String> states = new HashMap<>(); @@ -2669,7 +2661,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C final Set<ControllerServiceReferencingComponentDTO> nodeReferencingComponents = nodeEntry.getValue(); // go through all the nodes referencing components - if ( nodeReferencingComponents != null ) { + if (nodeReferencingComponents != null) { for (final ControllerServiceReferencingComponentDTO nodeReferencingComponent : nodeReferencingComponents) { // handle active thread counts if (nodeReferencingComponent.getActiveThreadCount() != null && nodeReferencingComponent.getActiveThreadCount() > 0) { @@ -2680,7 +2672,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C activeThreadCounts.put(nodeReferencingComponent.getId(), nodeReferencingComponent.getActiveThreadCount() + current); } } - + // handle controller service state final String state = states.get(nodeReferencingComponent.getId()); if (state == null) { @@ -2692,7 +2684,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } } } - } + } // go through each referencing components for (final ControllerServiceReferencingComponentDTO referencingComponent : referencingComponents) { @@ -2700,24 +2692,24 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C if (activeThreadCount != null) { referencingComponent.setActiveThreadCount(activeThreadCount); } - + final String state = states.get(referencingComponent.getId()); if (state != null) { referencingComponent.setState(state); } } } - + private void mergeControllerService(final ControllerServiceDTO controllerService, final Map<NodeIdentifier, ControllerServiceDTO> controllerServiceMap) { final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>(); final Set<ControllerServiceReferencingComponentDTO> referencingComponents = controllerService.getReferencingComponents(); final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> nodeReferencingComponentsMap = new HashMap<>(); - + String state = null; for (final Map.Entry<NodeIdentifier, ControllerServiceDTO> nodeEntry : controllerServiceMap.entrySet()) { final NodeIdentifier nodeId = nodeEntry.getKey(); final ControllerServiceDTO nodeControllerService = nodeEntry.getValue(); - + if (state == null) { if (ControllerServiceState.DISABLING.name().equals(nodeControllerService.getState())) { state = ControllerServiceState.DISABLING.name(); @@ -2725,27 +2717,27 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C state = ControllerServiceState.ENABLING.name(); } } - + for (final ControllerServiceReferencingComponentDTO nodeReferencingComponents : nodeControllerService.getReferencingComponents()) { nodeReferencingComponentsMap.put(nodeId, nodeReferencingComponents.getReferencingComponents()); } - + // merge the validation errors mergeValidationErrors(validationErrorMap, nodeId, nodeControllerService.getValidationErrors()); } - + // merge the referencing components mergeControllerServiceReferences(referencingComponents, nodeReferencingComponentsMap); - + // store the 'transition' state is applicable if (state != null) { controllerService.setState(state); } - + // set the merged the validation errors controllerService.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, controllerServiceMap.size())); } - + private void mergeReportingTask(final ReportingTaskDTO reportingTask, final Map<NodeIdentifier, ReportingTaskDTO> reportingTaskMap) { final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>(); @@ -2757,24 +2749,25 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C if (nodeReportingTask.getActiveThreadCount() != null) { activeThreadCount += nodeReportingTask.getActiveThreadCount(); } - + // merge the validation errors mergeValidationErrors(validationErrorMap, nodeId, nodeReportingTask.getValidationErrors()); } // set the merged active thread counts reportingTask.setActiveThreadCount(activeThreadCount); - + // set the merged the validation errors reportingTask.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, reportingTaskMap.size())); } /** - * Merges the validation errors into the specified map, recording the corresponding node identifier. - * + * Merges the validation errors into the specified map, recording the + * corresponding node identifier. + * * @param validationErrorMap * @param nodeId - * @param nodeValidationErrors + * @param nodeValidationErrors */ public void mergeValidationErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, final NodeIdentifier nodeId, final Collection<String> nodeValidationErrors) { if (nodeValidationErrors != null) { @@ -2788,13 +2781,14 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } } } - + /** - * Normalizes the validation errors by prepending the corresponding nodes when the error does not exist across all nodes. - * + * Normalizes the validation errors by prepending the corresponding nodes + * when the error does not exist across all nodes. + * * @param validationErrorMap * @param totalNodes - * @return + * @return */ public Set<String> normalizedMergedValidationErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, int totalNodes) { final Set<String> normalizedValidationErrors = new HashSet<>(); @@ -2812,7 +2806,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } return normalizedValidationErrors; } - + // requires write lock to be already acquired unless request is not mutable private NodeResponse mergeResponses(final URI uri, final String method, final Set<NodeResponse> nodeResponses, final boolean mutableRequest) { // holds the one response of all the node responses to return to the client @@ -3105,7 +3099,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } else if (hasSuccessfulClientResponse && isControllerServiceEndpoint(uri, method)) { final ControllerServiceEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerServiceEntity.class); final ControllerServiceDTO controllerService = responseEntity.getControllerService(); - + final Map<NodeIdentifier, ControllerServiceDTO> resultsMap = new HashMap<>(); for (final NodeResponse nodeResponse : updatedNodesMap.values()) { if (problematicNodeResponses.contains(nodeResponse)) { @@ -3118,12 +3112,12 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C resultsMap.put(nodeResponse.getNodeId(), nodeControllerService); } mergeControllerService(controllerService, resultsMap); - + clientResponse = new NodeResponse(clientResponse, responseEntity); } else if (hasSuccessfulClientResponse && isControllerServicesEndpoint(uri, method)) { final ControllerServicesEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerServicesEntity.class); final Set<ControllerServiceDTO> controllerServices = responseEntity.getControllerServices(); - + final Map<String, Map<NodeIdentifier, ControllerServiceDTO>> controllerServiceMap = new HashMap<>(); for (final NodeResponse nodeResponse : updatedNodesMap.values()) { if (problematicNodeResponses.contains(nodeResponse)) { @@ -3156,7 +3150,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } else if (hasSuccessfulClientResponse && isControllerServiceReferenceEndpoint(uri, method)) { final ControllerServiceReferencingComponentsEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class); final Set<ControllerServiceReferencingComponentDTO> referencingComponents = responseEntity.getControllerServiceReferencingComponents(); - + final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> resultsMap = new HashMap<>(); for (final NodeResponse nodeResponse : updatedNodesMap.values()) { if (problematicNodeResponses.contains(nodeResponse)) { @@ -3169,12 +3163,12 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C resultsMap.put(nodeResponse.getNodeId(), nodeReferencingComponents); } mergeControllerServiceReferences(referencingComponents, resultsMap); - + clientResponse = new NodeResponse(clientResponse, responseEntity); } else if (hasSuccessfulClientResponse && isReportingTaskEndpoint(uri, method)) { final ReportingTaskEntity responseEntity = clientResponse.getClientResponse().getEntity(ReportingTaskEntity.class); final ReportingTaskDTO reportingTask = responseEntity.getReportingTask(); - + final Map<NodeIdentifier, ReportingTaskDTO> resultsMap = new HashMap<>(); for (final NodeResponse nodeResponse : updatedNodesMap.values()) { if (problematicNodeResponses.contains(nodeResponse)) { @@ -3187,12 +3181,12 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C resultsMap.put(nodeResponse.getNodeId(), nodeReportingTask); } mergeReportingTask(reportingTask, resultsMap); - + clientResponse = new NodeResponse(clientResponse, responseEntity); } else if (hasSuccessfulClientResponse && isReportingTasksEndpoint(uri, method)) { final ReportingTasksEntity responseEntity = clientResponse.getClientResponse().getEntity(ReportingTasksEntity.class); final Set<ReportingTaskDTO> reportingTaskSet = responseEntity.getReportingTasks(); - + final Map<String, Map<NodeIdentifier, ReportingTaskDTO>> reportingTaskMap = new HashMap<>(); for (final NodeResponse nodeResponse : updatedNodesMap.values()) { if (problematicNodeResponses.contains(nodeResponse)) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java index 84565da..1b128f7 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java @@ -161,7 +161,7 @@ public class Node implements Cloneable, Comparable<Node> { * * This method is thread-safe and may be called without obtaining any lock. * - * @param connectionRequestedTimestamp + * @param connectionRequestedTimestamp timestamp */ public void setConnectionRequestedTimestamp(long connectionRequestedTimestamp) { this.connectionRequestedTimestamp.set(connectionRequestedTimestamp); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java index e26d196..c369a7f 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java @@ -51,7 +51,7 @@ public class ClusterManagerProtocolServiceLocatorFactoryBean implements FactoryB @Override public Object getObject() throws Exception { /* - * If configured for the cluster manager, then the service locator is never used. + * If configured for the cluster manager, then the service locator is never used. */ if (properties.isClusterManager()) { return null; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java index d3cff3b..7bcb203 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java @@ -49,7 +49,7 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon private NiFiProperties properties; private StringEncryptor encryptor; - + private OptimisticLockingManager optimisticLockingManager; @Override @@ -58,8 +58,8 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon throw new IllegalStateException("Application may be configured as a cluster manager or a node, but not both."); } else if (!properties.isClusterManager()) { /* - * If not configured for the cluster manager, then the cluster manager is never used. - * null is returned so that we don't instantiate a thread pool or other resources. + * If not configured for the cluster manager, then the cluster manager is never used. + * null is returned so that we don't instantiate a thread pool or other resources. */ return null; } else if (clusterManager == null) { @@ -127,7 +127,7 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon public void setEncryptor(final StringEncryptor encryptor) { this.encryptor = encryptor; } - + public void setOptimisticLockingManager(OptimisticLockingManager optimisticLockingManager) { this.optimisticLockingManager = optimisticLockingManager; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationRequest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationRequest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationRequest.java index 939c3f0..c2e940a 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationRequest.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationRequest.java @@ -20,14 +20,15 @@ package org.apache.nifi.web; * Represents a request to configure. The implementations execute method will * perform the configuration action. It will return type T which will be * encapsulated in a ConfigurationSnapshot. - * - * @param <T> + * + * @param <T> type of request */ public interface ConfigurationRequest<T> { /** - * Executes a configuration action and returns the updated resulting configuration. - * + * Executes a configuration action and returns the updated resulting + * configuration. + * * @return The resulting configuration */ T execute(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationSnapshot.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationSnapshot.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationSnapshot.java index 8817d69..c706fd2 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationSnapshot.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationSnapshot.java @@ -18,7 +18,8 @@ package org.apache.nifi.web; /** * Response object that captures some configuration for a given revision. - * @param <T> + * + * @param <T> type of snapshot */ public class ConfigurationSnapshot<T> { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/FlowModification.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/FlowModification.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/FlowModification.java index f6bccb1..70aa30e 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/FlowModification.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/FlowModification.java @@ -27,9 +27,9 @@ public class FlowModification { /** * Creates a new FlowModification. - * - * @param revision - * @param lastModifier + * + * @param revision revision + * @param lastModifier modifier */ public FlowModification(Revision revision, String lastModifier) { this.revision = revision; @@ -38,8 +38,8 @@ public class FlowModification { /** * Get the revision. - * - * @return + * + * @return the revision */ public Revision getRevision() { return revision; @@ -47,11 +47,11 @@ public class FlowModification { /** * Get the last modifier. - * - * @return + * + * @return the modifier */ public String getLastModifier() { return lastModifier; } - + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/OptimisticLockingManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/OptimisticLockingManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/OptimisticLockingManager.java index 4c54b7c..3cb1d45 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/OptimisticLockingManager.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/OptimisticLockingManager.java @@ -26,26 +26,27 @@ package org.apache.nifi.web; public interface OptimisticLockingManager { /** - * Attempts to execute the specified configuration request using the specified revision within a lock. - * - * @param <T> - * @param revision - * @param configurationRequest - * @return + * Attempts to execute the specified configuration request using the + * specified revision within a lock. + * + * @param <T> type of snapshot + * @param revision revision + * @param configurationRequest request + * @return snapshot */ <T> ConfigurationSnapshot<T> configureFlow(Revision revision, ConfigurationRequest<T> configurationRequest); - + /** * Updates the revision using the specified revision within a lock. - * - * @param updateRevision + * + * @param updateRevision new revision */ void setRevision(UpdateRevision updateRevision); /** - * Returns the last flow modification. This is a combination of the revision and the user - * who performed the modification. - * + * Returns the last flow modification. This is a combination of the revision + * and the user who performed the modification. + * * @return the last modification */ FlowModification getLastModification();