This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 01c4ec6893 NIFI-14900 Added support for dropStateKeySupported Stateful
flag (#10238)
01c4ec6893 is described below
commit 01c4ec68930ac11e91f0634a099ce5c3a276c0b8
Author: Pierre Villard <[email protected]>
AuthorDate: Fri Sep 12 06:56:10 2025 +0200
NIFI-14900 Added support for dropStateKeySupported Stateful flag (#10238)
Signed-off-by: David Handermann <[email protected]>
---
.../state/provider/StandardStateMap.java | 2 +-
.../providers/zookeeper/StandardStateMap.java | 2 +-
.../apache/nifi/web/api/dto/ComponentStateDTO.java | 13 ++
.../controller/state/StandardStateManager.java | 15 +-
.../manager/StandardStateManagerProvider.java | 30 +++-
.../apache/nifi/groups/StandardProcessGroup.java | 13 +-
.../components/state/StateManagerProvider.java | 41 ++++-
.../apache/nifi/controller/ExtensionBuilder.java | 11 +-
.../org/apache/nifi/controller/FlowController.java | 3 +-
.../nifi/controller/StandardFlowSnippet.java | 3 +-
.../nifi/controller/StandardReloadComponent.java | 3 +-
.../nifi/controller/flow/StandardFlowManager.java | 5 +
.../StandardFlowAnalysisRuleContext.java | 3 +-
.../reporting/StandardReportingContext.java | 3 +-
.../scheduling/RepositoryContextFactory.java | 6 +-
.../scheduling/StandardProcessScheduler.java | 18 ++-
.../nifi/controller/tasks/ConnectableTask.java | 11 +-
.../TestStandardControllerServiceProvider.java | 9 +-
.../org/apache/nifi/web/NiFiServiceFacade.java | 24 ++-
.../apache/nifi/web/StandardNiFiServiceFacade.java | 25 +--
.../apache/nifi/web/api/ControllerResource.java | 16 +-
.../nifi/web/api/ControllerServiceResource.java | 16 +-
.../nifi/web/api/ParameterProviderResource.java | 16 +-
.../org/apache/nifi/web/api/ProcessorResource.java | 18 ++-
.../apache/nifi/web/api/ReportingTaskResource.java | 16 +-
.../org/apache/nifi/web/api/dto/DtoFactory.java | 17 +++
.../org/apache/nifi/web/dao/ComponentStateDAO.java | 29 +++-
.../apache/nifi/web/dao/ControllerServiceDAO.java | 4 +-
.../apache/nifi/web/dao/FlowAnalysisRuleDAO.java | 4 +-
.../apache/nifi/web/dao/ParameterProviderDAO.java | 4 +-
.../java/org/apache/nifi/web/dao/ProcessorDAO.java | 6 +-
.../org/apache/nifi/web/dao/ReportingTaskDAO.java | 6 +-
.../web/dao/impl/StandardComponentStateDAO.java | 113 ++++++++++++--
.../web/dao/impl/StandardControllerServiceDAO.java | 9 +-
.../web/dao/impl/StandardFlowAnalysisRuleDAO.java | 9 +-
.../web/dao/impl/StandardParameterProviderDAO.java | 9 +-
.../nifi/web/dao/impl/StandardProcessorDAO.java | 10 +-
.../web/dao/impl/StandardReportingTaskDAO.java | 9 +-
.../apache/nifi/audit/TestProcessorAuditor.java | 2 +-
.../reporting/StatelessReportingContext.java | 3 +-
.../nifi/stateless/engine/ComponentBuilder.java | 2 +-
.../stateless/engine/StatelessFlowManager.java | 5 +-
.../engine/StatelessProcessContextFactory.java | 5 +-
.../stateless/engine/StatelessReloadComponent.java | 6 +-
.../StatelessRepositoryContextFactory.java | 6 +-
.../processors/tests/system/GenerateFlowFile.java | 2 +
.../processors/tests/system/MultiKeyState.java | 75 +++++++++
.../tests/system/MultiKeyStateNotDroppable.java | 75 +++++++++
.../services/org.apache.nifi.processor.Processor | 2 +
.../tests/system/state/AbstractStateKeyDropIT.java | 109 +++++++++++++
.../tests/system/state/ClusterStateKeyDropIT.java | 169 +++++++++++++++++++++
.../system/state/StandaloneStateKeyDropIT.java | 117 ++++++++++++++
.../nifi/toolkit/client/ProcessorClient.java | 8 +-
.../toolkit/client/impl/JerseyProcessorClient.java | 17 ++-
54 files changed, 1022 insertions(+), 132 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java
b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java
index dae5ccb334..48529620b8 100644
---
a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java
+++
b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java
@@ -30,7 +30,7 @@ class StandardStateMap implements StateMap {
private final Optional<String> version;
StandardStateMap(final Map<String, String> data, final Optional<String>
version) {
- this.data = Collections.unmodifiableMap(data);
+ this.data = Collections.unmodifiableMap(data == null ?
Collections.emptyMap() : data);
this.version = version;
}
diff --git
a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-zookeeper-bundle/nifi-framework-zookeeper-state-provider/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/StandardStateMap.java
b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-zookeeper-bundle/nifi-framework-zookeeper-state-provider/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/StandardStateMap.java
index 560cb36340..612c4eacc7 100644
---
a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-zookeeper-bundle/nifi-framework-zookeeper-state-provider/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/StandardStateMap.java
+++
b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-zookeeper-bundle/nifi-framework-zookeeper-state-provider/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/StandardStateMap.java
@@ -31,7 +31,7 @@ class StandardStateMap implements StateMap {
private final Optional<String> version;
StandardStateMap(final Map<String, String> data, final Optional<String>
version) {
- this.data = Collections.unmodifiableMap(data);
+ this.data = Collections.unmodifiableMap(data == null ?
Collections.emptyMap() : data);
this.version = version;
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ComponentStateDTO.java
b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ComponentStateDTO.java
index 37ba9d13f2..99858d378b 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ComponentStateDTO.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ComponentStateDTO.java
@@ -30,6 +30,7 @@ public class ComponentStateDTO {
private String stateDescription;
private StateMapDTO clusterState;
private StateMapDTO localState;
+ private Boolean dropStateKeySupported;
/**
* @return The component identifier
@@ -82,4 +83,16 @@ public class ComponentStateDTO {
public void setLocalState(StateMapDTO localState) {
this.localState = localState;
}
+
+ /**
+ * @return Whether dropping state by key is supported for this component.
+ */
+ @Schema(description = "Whether dropping state by key is supported for this
component. Defaults to false when not specified by the component.")
+ public Boolean isDropStateKeySupported() {
+ return dropStateKeySupported;
+ }
+
+ public void setDropStateKeySupported(final Boolean dropStateKeySupported) {
+ this.dropStateKeySupported = dropStateKeySupported;
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java
index d3cf7c03df..2bd86d3489 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java
@@ -24,21 +24,29 @@ import org.apache.nifi.components.state.StateProvider;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogRepository;
import org.apache.nifi.logging.LogRepositoryFactory;
-import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.logging.StandardLoggingContext;
+import org.apache.nifi.processor.SimpleProcessLogger;
import java.io.IOException;
import java.util.Map;
+import java.util.function.Supplier;
public class StandardStateManager implements StateManager {
private final StateProvider localProvider;
private final StateProvider clusterProvider;
private final String componentId;
+ private final Supplier<Boolean> dropStateKeySupportedSupplier;
public StandardStateManager(final StateProvider localProvider, final
StateProvider clusterProvider, final String componentId) {
+ this(localProvider, clusterProvider, componentId, () -> false);
+ }
+
+ public StandardStateManager(final StateProvider localProvider, final
StateProvider clusterProvider,
+ final String componentId, final
Supplier<Boolean> dropStateKeySupportedSupplier) {
this.localProvider = localProvider;
this.clusterProvider = clusterProvider;
this.componentId = componentId;
+ this.dropStateKeySupportedSupplier = dropStateKeySupportedSupplier;
}
private StateProvider getProvider(final Scope scope) {
@@ -86,6 +94,11 @@ public class StandardStateManager implements StateManager {
getProvider(scope).clear(componentId);
}
+ @Override
+ public boolean isStateKeyDropSupported() {
+ return dropStateKeySupportedSupplier.get();
+ }
+
@Override
public String toString() {
return "StandardStateManager[componentId=" + componentId + "]";
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
index 6ddfd443d3..c55c761c36 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
@@ -30,6 +30,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.nifi.attribute.expression.language.Query;
@@ -81,6 +82,7 @@ public class StandardStateManagerProvider implements
StateManagerProvider {
private final StateProvider localStateProvider;
private final StateProvider clusterStateProvider;
private final StateProvider previousClusterStateProvider;
+ private final ConcurrentMap<String, AtomicBoolean> dropStateKeySupported =
new ConcurrentHashMap<>();
public StandardStateManagerProvider(final StateProvider
localStateProvider, final StateProvider clusterStateProvider) {
this.localStateProvider = localStateProvider;
@@ -554,6 +556,23 @@ public class StandardStateManagerProvider implements
StateManagerProvider {
*
* @return the StateManager that can be used by the component with the
given ID, or <code>null</code> if none exists
*/
+ @Override
+ public synchronized StateManager getStateManager(final String componentId,
final boolean dropSupportedFlag) {
+ final AtomicBoolean dropSupported =
dropStateKeySupported.computeIfAbsent(componentId, id -> new
AtomicBoolean(false));
+ if (dropSupportedFlag) {
+ dropSupported.set(true);
+ }
+
+ StateManager stateManager = stateManagers.get(componentId);
+ if (stateManager != null) {
+ return stateManager;
+ }
+
+ stateManager = new StandardStateManager(localStateProvider,
clusterStateProvider, componentId, dropSupported::get);
+ stateManagers.put(componentId, stateManager);
+ return stateManager;
+ }
+
@Override
public synchronized StateManager getStateManager(final String componentId)
{
StateManager stateManager = stateManagers.get(componentId);
@@ -561,7 +580,8 @@ public class StandardStateManagerProvider implements
StateManagerProvider {
return stateManager;
}
- stateManager = new StandardStateManager(localStateProvider,
clusterStateProvider, componentId);
+ final AtomicBoolean dropSupported =
dropStateKeySupported.computeIfAbsent(componentId, id -> new
AtomicBoolean(false));
+ stateManager = new StandardStateManager(localStateProvider,
clusterStateProvider, componentId, dropSupported::get);
stateManagers.put(componentId, stateManager);
return stateManager;
}
@@ -623,5 +643,13 @@ public class StandardStateManagerProvider implements
StateManagerProvider {
logger.warn("Component with ID {} was removed from NiFi
instance but failed to cleanup resources used to maintain its clustered state",
componentId, e);
}
}
+
+ dropStateKeySupported.remove(componentId);
}
+
+ @Override
+ public boolean isClusterProviderEnabled() {
+ return nifiProperties.isClustered() && clusterStateProvider != null &&
clusterStateProvider.isEnabled();
+ }
+
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 54436ada0d..333bb093c9 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -656,15 +656,16 @@ public final class StandardProcessGroup implements
ProcessGroup {
return this.scheduler.getActiveThreadCount(statelessGroupNode) > 0;
}
- private StateManager getStateManager(final String componentId) {
- return stateManagerProvider.getStateManager(componentId);
+ private StateManager getStateManager(final ProcessorNode processorNode) {
+ final Class<?> componentClass = processorNode.getProcessor() == null ?
null : processorNode.getProcessor().getClass();
+ return
stateManagerProvider.getStateManager(processorNode.getIdentifier(),
componentClass);
}
private void shutdown(final ProcessGroup procGroup) {
for (final ProcessorNode node : procGroup.getProcessors()) {
try (final NarCloseable ignored =
NarCloseable.withComponentNarLoader(extensionManager,
node.getProcessor().getClass(), node.getIdentifier())) {
final StandardProcessContext processContext = new
StandardProcessContext(node, controllerServiceProvider,
- getStateManager(node.getIdentifier()), () -> false,
nodeTypeProvider);
+ getStateManager(node), () -> false, nodeTypeProvider);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class,
node.getProcessor(), processContext);
}
}
@@ -1230,7 +1231,7 @@ public final class StandardProcessGroup implements
ProcessGroup {
try (final NarCloseable ignored =
NarCloseable.withComponentNarLoader(extensionManager,
processor.getProcessor().getClass(), processor.getIdentifier())) {
final StandardProcessContext processContext = new
StandardProcessContext(processor, controllerServiceProvider,
- getStateManager(processor.getIdentifier()), () -> false,
nodeTypeProvider);
+ getStateManager(processor), () -> false, nodeTypeProvider);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class,
processor.getProcessor(), processContext);
} catch (final Exception e) {
throw new ComponentLifeCycleException("Failed to invoke
'OnRemoved' methods of processor with id " + processor.getIdentifier(), e);
@@ -3866,8 +3867,10 @@ public final class StandardProcessGroup implements
ProcessGroup {
}
private ProcessContext createProcessContext(final ProcessorNode
processorNode) {
+ final org.apache.nifi.processor.Processor processor =
processorNode.getProcessor();
+ final Class<?> componentClass = processor == null ? null :
processor.getClass();
return new StandardProcessContext(processorNode,
controllerServiceProvider,
-
stateManagerProvider.getStateManager(processorNode.getIdentifier()), () ->
false, nodeTypeProvider);
+
stateManagerProvider.getStateManager(processorNode.getIdentifier(),
componentClass), () -> false, nodeTypeProvider);
}
private ConfigurationContext createConfigurationContext(final
ComponentNode component) {
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/state/StateManagerProvider.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/state/StateManagerProvider.java
index 0b1ab8d52e..5a8808f890 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/state/StateManagerProvider.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/state/StateManagerProvider.java
@@ -17,6 +17,8 @@
package org.apache.nifi.components.state;
+import org.apache.nifi.annotation.behavior.Stateful;
+
/**
* <p>
* Interface that provides a mechanism for obtaining the {@link StateManager}
for a particular component
@@ -32,7 +34,37 @@ public interface StateManagerProvider {
* @return the StateManager for the component with the given ID, or
<code>null</code> if no State Manager
* exists for the component with the given ID
*/
- StateManager getStateManager(String componentId);
+ default StateManager getStateManager(String componentId) {
+ return getStateManager(componentId, false);
+ }
+
+ /**
+ * Returns the StateManager for the component with the given ID with the
capability of dropping individual
+ * state keys if supported
+ *
+ * @param componentId the id of the component for which the StateManager
should be returned
+ * @param dropStateKeySupported whether the component supports dropping
specific state keys
+ * @return the StateManager for the component with the given ID, or
<code>null</code> if no State Manager
+ * exists for the component with the given ID
+ */
+ StateManager getStateManager(String componentId, boolean
dropStateKeySupported);
+
+ /**
+ * Returns the StateManager for the given component identifier, using the
provided component class to
+ * determine whether dropping individual state keys is supported based on
the {@link Stateful} annotation.
+ *
+ * @param componentId the id of the component for which the StateManager
should be returned
+ * @param componentClass the component class if known; may be null
+ * @return the StateManager for the component with the given ID, or null
if none exists
+ */
+ default StateManager getStateManager(final String componentId, final
Class<?> componentClass) {
+ boolean dropSupported = false;
+ if (componentClass != null) {
+ final Stateful stateful =
componentClass.getAnnotation(Stateful.class);
+ dropSupported = stateful != null &&
stateful.dropStateKeySupported();
+ }
+ return getStateManager(componentId, dropSupported);
+ }
/**
* Notifies the State Manager Provider that the component with the given
ID has been removed from the NiFi instance
@@ -57,4 +89,11 @@ public interface StateManagerProvider {
* state, even when components request a clustered provider
*/
void disableClusterProvider();
+
+ /**
+ * Returns whether the Cluster State Provider is enabled.
+ *
+ * @return true if the Cluster State Provider is enabled, false otherwise
+ */
+ boolean isClusterProviderEnabled();
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
index 4f2d5050b0..6157486cfd 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
@@ -514,6 +514,7 @@ public class ExtensionBuilder {
final ValidationContextFactory validationContextFactory =
createValidationContextFactory(serviceProvider);
final ReportingTaskNode taskNode;
if (creationSuccessful) {
+ stateManagerProvider.getStateManager(identifier,
reportingTask.getComponent().getClass());
taskNode = new StandardReportingTaskNode(reportingTask, identifier,
flowController, processScheduler,
validationContextFactory, reloadComponent,
extensionManager, validationTrigger);
taskNode.setName(taskNode.getReportingTask().getClass().getSimpleName());
@@ -534,9 +535,10 @@ public class ExtensionBuilder {
}
private ParameterProviderNode createParameterProviderNode(final
LoggableComponent<ParameterProvider> parameterProvider, final boolean
creationSuccessful) {
- final ValidationContextFactory validationContextFactory = new
StandardValidationContextFactory(serviceProvider, ruleViolationsManager,
flowAnalyzer);
+ final ValidationContextFactory validationContextFactory =
createValidationContextFactory(serviceProvider);
final ParameterProviderNode parameterProviderNode;
if (creationSuccessful) {
+ stateManagerProvider.getStateManager(identifier,
parameterProvider.getComponent().getClass());
parameterProviderNode = new
StandardParameterProviderNode(parameterProvider, identifier, flowController,
flowController.getControllerServiceProvider(),
validationContextFactory, reloadComponent, extensionManager,
validationTrigger);
@@ -555,7 +557,7 @@ public class ExtensionBuilder {
}
private FlowRegistryClientNode createFlowRegistryClientNode(final
LoggableComponent<FlowRegistryClient> client, final boolean creationSuccessful)
{
- final ValidationContextFactory validationContextFactory = new
StandardValidationContextFactory(serviceProvider, ruleViolationsManager,
flowAnalyzer);
+ final ValidationContextFactory validationContextFactory =
createValidationContextFactory(serviceProvider);
final StandardFlowRegistryClientNode clientNode;
if (creationSuccessful) {
@@ -668,7 +670,7 @@ public class ExtensionBuilder {
final ComponentLog serviceLogger = new
SimpleProcessLogger(identifier, serviceImpl, new StandardLoggingContext(null));
final TerminationAwareLogger terminationAwareLogger = new
TerminationAwareLogger(serviceLogger);
- final StateManager stateManager =
stateManagerProvider.getStateManager(identifier);
+ final StateManager stateManager =
stateManagerProvider.getStateManager(identifier, serviceImpl.getClass());
final ControllerServiceInitializationContext initContext = new
StandardControllerServiceInitializationContext(identifier,
terminationAwareLogger,
serviceProvider, stateManager, kerberosConfig,
nodeTypeProvider);
serviceImpl.initialize(initContext);
@@ -834,6 +836,7 @@ public class ExtensionBuilder {
final ValidationContextFactory validationContextFactory =
createValidationContextFactory(serviceProvider);
final FlowAnalysisRuleNode ruleNode;
if (creationSuccessful) {
+ stateManagerProvider.getStateManager(identifier,
flowAnalysisRule.getComponent().getClass());
ruleNode = new StandardFlowAnalysisRuleNode(flowAnalysisRule,
identifier, flowController,
validationContextFactory, ruleViolationsManager,
reloadComponent, extensionManager, validationTrigger);
ruleNode.setName(ruleNode.getFlowAnalysisRule().getClass().getSimpleName());
@@ -941,4 +944,4 @@ public class ExtensionBuilder {
}
}
}
-}
\ No newline at end of file
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index cc50486c09..8607578257 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -1104,8 +1104,9 @@ public class FlowController implements
ReportingTaskProvider, FlowAnalysisRulePr
for (final ProcessorNode procNode :
flowManager.getRootGroup().findAllProcessors()) {
final Processor processor = procNode.getProcessor();
try (final NarCloseable ignored =
NarCloseable.withComponentNarLoader(extensionManager, processor.getClass(),
processor.getIdentifier())) {
+ final Class<?> componentClass = processor == null ? null :
processor.getClass();
final StandardProcessContext processContext = new
StandardProcessContext(procNode, controllerServiceProvider,
-
getStateManagerProvider().getStateManager(processor.getIdentifier()), () ->
false, this);
+
getStateManagerProvider().getStateManager(processor.getIdentifier(),
componentClass), () -> false, this);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class,
processor, processContext);
}
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java
index 2c0a82da56..fff602fd68 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java
@@ -426,8 +426,9 @@ public class StandardFlowSnippet implements FlowSnippet {
}
// Notify the processor node that the configuration
(properties, e.g.) has been restored
+ final Class<?> componentClass = procNode.getProcessor() ==
null ? null : procNode.getProcessor().getClass();
final StandardProcessContext processContext = new
StandardProcessContext(procNode, flowController.getControllerServiceProvider(),
-
flowController.getStateManagerProvider().getStateManager(procNode.getProcessor().getIdentifier()),
() -> false, flowController);
+
flowController.getStateManagerProvider().getStateManager(procNode.getProcessor().getIdentifier(),
componentClass), () -> false, flowController);
procNode.onConfigurationRestored(processContext);
} finally {
procNode.resumeValidationTrigger();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
index 5ed1829232..cec3a9ade2 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
@@ -76,7 +76,8 @@ public class StandardReloadComponent implements
ReloadComponent {
// save the instance class loader to use it for calling OnRemoved on
the existing processor
final ClassLoader existingInstanceClassLoader =
extensionManager.getInstanceClassLoader(id);
- final StateManager stateManager =
flowController.getStateManagerProvider().getStateManager(id);
+ final Class<?> componentClass = existingNode.getProcessor() == null ?
null : existingNode.getProcessor().getClass();
+ final StateManager stateManager =
flowController.getStateManagerProvider().getStateManager(id, componentClass);
final StandardProcessContext processContext = new
StandardProcessContext(existingNode,
flowController.getControllerServiceProvider(),
stateManager, () -> false, flowController);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
index bdbce0d905..d592b42e0a 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
@@ -91,6 +91,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
+
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.util.ArrayList;
@@ -360,6 +361,7 @@ public class StandardFlowManager extends
AbstractFlowManager implements FlowMana
.reloadComponent(flowController.getReloadComponent())
.addClasspathUrls(additionalUrls)
.kerberosConfig(flowController.createKerberosConfig(nifiProperties))
+ .stateManagerProvider(flowController.getStateManagerProvider())
.extensionManager(extensionManager)
.flowAnalyzer(getFlowAnalyzer().orElse(null))
.ruleViolationsManager(getRuleViolationsManager().orElse(null))
@@ -498,6 +500,7 @@ public class StandardFlowManager extends
AbstractFlowManager implements FlowMana
.reloadComponent(flowController.getReloadComponent())
.addClasspathUrls(additionalUrls)
.kerberosConfig(flowController.createKerberosConfig(nifiProperties))
+ .stateManagerProvider(flowController.getStateManagerProvider())
.flowController(flowController)
.extensionManager(extensionManager)
.classloaderIsolationKey(classloaderIsolationKey)
@@ -561,6 +564,7 @@ public class StandardFlowManager extends
AbstractFlowManager implements FlowMana
.reloadComponent(flowController.getReloadComponent())
.addClasspathUrls(additionalUrls)
.kerberosConfig(flowController.createKerberosConfig(nifiProperties))
+ .stateManagerProvider(flowController.getStateManagerProvider())
.flowController(flowController)
.extensionManager(extensionManager)
.flowAnalyzer(getFlowAnalyzer().orElse(null))
@@ -619,6 +623,7 @@ public class StandardFlowManager extends
AbstractFlowManager implements FlowMana
.reloadComponent(flowController.getReloadComponent())
.addClasspathUrls(additionalUrls)
.kerberosConfig(flowController.createKerberosConfig(nifiProperties))
+ .stateManagerProvider(flowController.getStateManagerProvider())
.flowController(flowController)
.extensionManager(extensionManager)
.buildParameterProvider();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flowanalysis/StandardFlowAnalysisRuleContext.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flowanalysis/StandardFlowAnalysisRuleContext.java
index a78920b7b3..cd4e037854 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flowanalysis/StandardFlowAnalysisRuleContext.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flowanalysis/StandardFlowAnalysisRuleContext.java
@@ -50,7 +50,8 @@ public class StandardFlowAnalysisRuleContext extends
AbstractFlowAnalysisRuleCon
@Override
public StateManager getStateManager() {
- return
flowController.getStateManagerProvider().getStateManager(getFlowAnalysisRule().getIdentifier());
+ final Class<?> componentClass = getFlowAnalysisRule() == null ? null :
getFlowAnalysisRule().getClass();
+ return
flowController.getStateManagerProvider().getStateManager(getFlowAnalysisRule().getIdentifier(),
componentClass);
}
@Override
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
index 645b5ee289..a20715a65a 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
@@ -51,7 +51,8 @@ public class StandardReportingContext extends
AbstractReportingContext implement
@Override
public StateManager getStateManager() {
- return
flowController.getStateManagerProvider().getStateManager(getReportingTask().getIdentifier());
+ final Class<?> componentClass = getReportingTask() == null ? null :
getReportingTask().getClass();
+ return
flowController.getStateManagerProvider().getStateManager(getReportingTask().getIdentifier(),
componentClass);
}
@Override
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/RepositoryContextFactory.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/RepositoryContextFactory.java
index e2594fc905..a640ca72e5 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/RepositoryContextFactory.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/RepositoryContextFactory.java
@@ -19,6 +19,7 @@ package org.apache.nifi.controller.scheduling;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
@@ -54,7 +55,10 @@ public class RepositoryContextFactory {
}
public RepositoryContext newProcessContext(final Connectable connectable,
final AtomicLong connectionIndex) {
- final StateManager stateManager =
stateManagerProvider.getStateManager(connectable.getIdentifier());
+ final Class<?> componentClass = (connectable instanceof ProcessorNode
&& ((ProcessorNode) connectable).getProcessor() != null)
+ ? ((ProcessorNode) connectable).getProcessor().getClass()
+ : null;
+ final StateManager stateManager =
stateManagerProvider.getStateManager(connectable.getIdentifier(),
componentClass);
return new StandardRepositoryContext(connectable, connectionIndex,
contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo,
stateManager, maxAppendableClaimBytes);
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index f7f156c350..789ca85a96 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -126,6 +126,15 @@ public final class StandardProcessScheduler implements
ProcessScheduler {
return stateManagerProvider.getStateManager(componentId);
}
+ private StateManager getStateManager(final String componentId, final
Class<?> componentClass) {
+ return stateManagerProvider.getStateManager(componentId,
componentClass);
+ }
+
+ private StateManager getStateManager(final ProcessorNode procNode) {
+ final Class<?> componentClass = procNode.getProcessor() == null ? null
: procNode.getProcessor().getClass();
+ return getStateManager(procNode.getIdentifier(), componentClass);
+ }
+
public void scheduleFrameworkTask(final Runnable task, final String
taskName, final long initialDelay, final long delay, final TimeUnit timeUnit) {
Thread.ofVirtual()
.name(taskName)
@@ -382,7 +391,7 @@ public final class StandardProcessScheduler implements
ProcessScheduler {
final LifecycleState lifecycleState =
getLifecycleState(requireNonNull(procNode), true, false);
final Supplier<ProcessContext> processContextFactory = () -> new
StandardProcessContext(procNode, getControllerServiceProvider(),
- getStateManager(procNode.getIdentifier()),
lifecycleState::isTerminated, flowController);
+ getStateManager(procNode), lifecycleState::isTerminated,
flowController);
final boolean scheduleActions =
procNode.getProcessGroup().resolveExecutionEngine() !=
ExecutionEngine.STATELESS;
@@ -515,7 +524,7 @@ public final class StandardProcessScheduler implements
ProcessScheduler {
final LifecycleState lifecycleState =
getLifecycleState(requireNonNull(procNode), true, false);
final Supplier<ProcessContext> processContextFactory = () -> new
StandardProcessContext(procNode, getControllerServiceProvider(),
- getStateManager(procNode.getIdentifier()),
lifecycleState::isTerminated, flowController);
+ getStateManager(procNode), lifecycleState::isTerminated,
flowController);
final CompletableFuture<Void> future = new CompletableFuture<>();
final SchedulingAgentCallback callback = new SchedulingAgentCallback()
{
@@ -556,7 +565,7 @@ public final class StandardProcessScheduler implements
ProcessScheduler {
final LifecycleState lifecycleState = getLifecycleState(procNode,
false, false);
final StandardProcessContext processContext = new
StandardProcessContext(procNode, getControllerServiceProvider(),
- getStateManager(procNode.getIdentifier()),
lifecycleState::isTerminated, flowController);
+ getStateManager(procNode), lifecycleState::isTerminated,
flowController);
LOG.info("Stopping {}", procNode);
return procNode.stop(this, this.componentLifeCycleThreadPool,
processContext, getSchedulingAgent(procNode), lifecycleState, lifecycleMethods);
@@ -732,7 +741,8 @@ public final class StandardProcessScheduler implements
ProcessScheduler {
getSchedulingAgent(connectable).unschedule(connectable, state);
if (!state.isScheduled() && state.getActiveThreadCount() == 0 &&
state.mustCallOnStoppedMethods()) {
- final ConnectableProcessContext processContext = new
ConnectableProcessContext(connectable,
getStateManager(connectable.getIdentifier()));
+ final StateManager stateManager = (connectable instanceof
ProcessorNode) ? getStateManager((ProcessorNode) connectable) :
getStateManager(connectable.getIdentifier());
+ final ConnectableProcessContext processContext = new
ConnectableProcessContext(connectable, stateManager);
try (final NarCloseable ignored =
NarCloseable.withComponentNarLoader(flowController.getExtensionManager(),
connectable.getClass(), connectable.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class,
connectable, processContext);
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
index 2453415ec1..434c22a2cb 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
@@ -44,6 +44,7 @@ import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
@@ -82,7 +83,15 @@ public class ConnectableTask {
this.numRelationships = connectable.getRelationships().size();
this.flowController = flowController;
- final StateManager stateManager = new
TaskTerminationAwareStateManager(flowController.getStateManagerProvider().getStateManager(connectable.getIdentifier()),
lifecycleState::isTerminated);
+ final StateManager baseStateManager;
+ if (connectable instanceof ProcessorNode processorNode) {
+ final Processor processor = processorNode.getProcessor();
+ final Class<?> componentClass = processor == null ? null :
processor.getClass();
+ baseStateManager =
flowController.getStateManagerProvider().getStateManager(connectable.getIdentifier(),
componentClass);
+ } else {
+ baseStateManager =
flowController.getStateManagerProvider().getStateManager(connectable.getIdentifier());
+ }
+ final StateManager stateManager = new
TaskTerminationAwareStateManager(baseStateManager,
lifecycleState::isTerminated);
if (connectable instanceof ProcessorNode) {
processContext = new StandardProcessContext(
(ProcessorNode) connectable,
flowController.getControllerServiceProvider(), stateManager,
lifecycleState::isTerminated, flowController);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
index c284c70d97..8e5b6497a9 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
@@ -55,8 +55,8 @@ import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.MockProcessorInitializationContext;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.SynchronousValidationTrigger;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;
@@ -84,7 +84,7 @@ public class TestStandardControllerServiceProvider {
private static StateManagerProvider stateManagerProvider = new
StateManagerProvider() {
@Override
- public StateManager getStateManager(final String componentId) {
+ public StateManager getStateManager(final String componentId, final
boolean dropStateKeySupported) {
final StateManager stateManager = Mockito.mock(StateManager.class);
final StateMap emptyStateMap = new
StandardStateMap(Collections.emptyMap(), Optional.empty());
try {
@@ -111,6 +111,11 @@ public class TestStandardControllerServiceProvider {
@Override
public void onComponentRemoved(final String componentId) {
}
+
+ @Override
+ public boolean isClusterProviderEnabled() {
+ return false;
+ }
};
private static NiFiProperties niFiProperties;
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 480e7383f1..9218dba029 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -1872,9 +1872,11 @@ public interface NiFiServiceFacade {
/**
* Clears the state for the specified processor.
*
- * @param processorId the processor id
+ * @param processorId processor id
+ * @param componentStateDTO state of the processor
+ * @return the cleared component state
*/
- void clearProcessorState(String processorId);
+ ComponentStateDTO clearProcessorState(final String processorId, final
ComponentStateDTO componentStateDTO);
/**
* Gets the state for the specified controller service.
@@ -1895,8 +1897,10 @@ public interface NiFiServiceFacade {
* Clears the state for the specified controller service.
*
* @param controllerServiceId the controller service id
+ * @param componentStateDTO state of the controller service
+ * @return the cleared component state
*/
- void clearControllerServiceState(String controllerServiceId);
+ ComponentStateDTO clearControllerServiceState(String controllerServiceId,
final ComponentStateDTO componentStateDTO);
/**
* Gets the state for the specified reporting task.
@@ -1916,9 +1920,11 @@ public interface NiFiServiceFacade {
/**
* Clears the state for the specified reporting task.
*
- * @param reportingTaskId the reporting task id
+ * @param reportingTaskId the reporting task id
+ * @param componentStateDTO the component state of the reporting task
+ * @return the cleared component state
*/
- void clearReportingTaskState(String reportingTaskId);
+ ComponentStateDTO clearReportingTaskState(String reportingTaskId, final
ComponentStateDTO componentStateDTO);
/**
* Gets the state for the specified parameter provider.
@@ -1939,8 +1945,10 @@ public interface NiFiServiceFacade {
* Clears the state for the specified parameter provider.
*
* @param parameterProviderId the parameter provider id
+ * @param componentStateDTO the component state of the parameter provider
+ * @return the cleared component state
*/
- void clearParameterProviderState(String parameterProviderId);
+ ComponentStateDTO clearParameterProviderState(String parameterProviderId,
final ComponentStateDTO componentStateDTO);
/**
* Gets the state for the specified RemoteProcessGroup.
@@ -2937,8 +2945,10 @@ public interface NiFiServiceFacade {
* Clears the state for the flow analysis rule with the specified id.
*
* @param flowAnalysisRuleId the flow analysis rule id
+ * @param componentStateDTO the state of the flow analysis rule
+ * @return the cleared component state
*/
- void clearFlowAnalysisRuleState(String flowAnalysisRuleId);
+ ComponentStateDTO clearFlowAnalysisRuleState(String flowAnalysisRuleId,
final ComponentStateDTO componentStateDTO);
/**
* Updates the specified flow analysis rule.
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 16f83e1f9b..7129508775 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -875,8 +875,9 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
}
@Override
- public void clearFlowAnalysisRuleState(String flowAnalysisRuleId) {
- flowAnalysisRuleDAO.clearState(flowAnalysisRuleId);
+ public ComponentStateDTO clearFlowAnalysisRuleState(final String
flowAnalysisRuleId, final ComponentStateDTO componentStateDTO) {
+ flowAnalysisRuleDAO.clearState(flowAnalysisRuleId, componentStateDTO);
+ return this.getFlowAnalysisRuleState(flowAnalysisRuleId);
}
@Override
@@ -2003,8 +2004,9 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
}
@Override
- public void clearProcessorState(final String processorId) {
- processorDAO.clearState(processorId);
+ public ComponentStateDTO clearProcessorState(final String processorId,
final ComponentStateDTO componentStateDTO) {
+ processorDAO.clearState(processorId, componentStateDTO);
+ return this.getProcessorState(processorId);
}
@Override
@@ -2013,8 +2015,9 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
}
@Override
- public void clearControllerServiceState(final String controllerServiceId) {
- controllerServiceDAO.clearState(controllerServiceId);
+ public ComponentStateDTO clearControllerServiceState(final String
controllerServiceId, final ComponentStateDTO componentStateDTO) {
+ controllerServiceDAO.clearState(controllerServiceId,
componentStateDTO);
+ return this.getControllerServiceState(controllerServiceId);
}
@Override
@@ -2023,8 +2026,9 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
}
@Override
- public void clearReportingTaskState(final String reportingTaskId) {
- reportingTaskDAO.clearState(reportingTaskId);
+ public ComponentStateDTO clearReportingTaskState(final String
reportingTaskId, final ComponentStateDTO componentStateDTO) {
+ reportingTaskDAO.clearState(reportingTaskId, componentStateDTO);
+ return this.getReportingTaskState(reportingTaskId);
}
@Override
@@ -2033,8 +2037,9 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
}
@Override
- public void clearParameterProviderState(final String parameterProviderId) {
- parameterProviderDAO.clearState(parameterProviderId);
+ public ComponentStateDTO clearParameterProviderState(final String
parameterProviderId, final ComponentStateDTO componentStateDTO) {
+ parameterProviderDAO.clearState(parameterProviderId,
componentStateDTO);
+ return this.getParameterProviderState(parameterProviderId);
}
@Override
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
index f2afba9242..f507fbb5ad 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
@@ -624,7 +624,7 @@ public class ControllerResource extends ApplicationResource
{
* @return a componentStateEntity
*/
@POST
- @Consumes(MediaType.WILDCARD)
+ @Consumes({MediaType.APPLICATION_JSON, MediaType.WILDCARD})
@Produces(MediaType.APPLICATION_JSON)
@Path("flow-analysis-rules/{id}/state/clear-requests")
@Operation(
@@ -646,10 +646,14 @@ public class ControllerResource extends
ApplicationResource {
description = "The flow analysis rule id.",
required = true
)
- @PathParam("id") final String id) {
+ @PathParam("id") final String id,
+ @Parameter(
+ description = "Optional component state to perform a
selective key removal. If omitted, clears all state.",
+ required = false
+ ) final ComponentStateEntity componentStateEntity) {
if (isReplicateRequest()) {
- return replicate(HttpMethod.POST);
+ return replicate(HttpMethod.POST, componentStateEntity);
}
final FlowAnalysisRuleEntity requestFlowAnalysisRuleEntity = new
FlowAnalysisRuleEntity();
@@ -661,11 +665,13 @@ public class ControllerResource extends
ApplicationResource {
lookup -> authorizeController(RequestAction.WRITE),
() -> serviceFacade.verifyCanClearFlowAnalysisRuleState(id),
(flowAnalysisRuleEntity) -> {
- // get the component state
-
serviceFacade.clearFlowAnalysisRuleState(flowAnalysisRuleEntity.getId());
+ // clear state
+ final ComponentStateDTO expectedState =
componentStateEntity == null ? null : componentStateEntity.getComponentState();
+ final ComponentStateDTO state =
serviceFacade.clearFlowAnalysisRuleState(flowAnalysisRuleEntity.getId(),
expectedState);
// generate the response entity
final ComponentStateEntity entity = new
ComponentStateEntity();
+ entity.setComponentState(state);
// generate the response
return generateOkResponse(entity).build();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
index 904f6054f7..3e6fd20347 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
@@ -346,7 +346,7 @@ public class ControllerServiceResource extends
ApplicationResource {
* @return a componentStateEntity
*/
@POST
- @Consumes(MediaType.WILDCARD)
+ @Consumes({MediaType.APPLICATION_JSON, MediaType.WILDCARD})
@Produces(MediaType.APPLICATION_JSON)
@Path("{id}/state/clear-requests")
@Operation(
@@ -368,10 +368,14 @@ public class ControllerServiceResource extends
ApplicationResource {
description = "The controller service id.",
required = true
)
- @PathParam("id") final String id) {
+ @PathParam("id") final String id,
+ @Parameter(
+ description = "Optional component state to perform a
selective key removal. If omitted, clears all state.",
+ required = false
+ ) final ComponentStateEntity componentStateEntity) {
if (isReplicateRequest()) {
- return replicate(HttpMethod.POST);
+ return replicate(HttpMethod.POST, componentStateEntity);
}
final ControllerServiceEntity requestControllerServiceEntity = new
ControllerServiceEntity();
@@ -386,11 +390,13 @@ public class ControllerServiceResource extends
ApplicationResource {
},
() -> serviceFacade.verifyCanClearControllerServiceState(id),
(controllerServiceEntity) -> {
- // get the component state
-
serviceFacade.clearControllerServiceState(controllerServiceEntity.getId());
+ // clear state
+ final ComponentStateDTO expectedState =
componentStateEntity == null ? null : componentStateEntity.getComponentState();
+ final ComponentStateDTO state =
serviceFacade.clearControllerServiceState(controllerServiceEntity.getId(),
expectedState);
// generate the response entity
final ComponentStateEntity entity = new
ComponentStateEntity();
+ entity.setComponentState(state);
// generate the response
return generateOkResponse(entity).build();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterProviderResource.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterProviderResource.java
index 5e91fef656..da78992a9b 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterProviderResource.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterProviderResource.java
@@ -440,7 +440,7 @@ public class ParameterProviderResource extends
AbstractParameterResource {
* @return a componentStateEntity
*/
@POST
- @Consumes(MediaType.WILDCARD)
+ @Consumes({MediaType.APPLICATION_JSON, MediaType.WILDCARD})
@Produces(MediaType.APPLICATION_JSON)
@Path("{id}/state/clear-requests")
@Operation(
@@ -462,10 +462,14 @@ public class ParameterProviderResource extends
AbstractParameterResource {
description = "The parameter provider id.",
required = true
)
- @PathParam("id") final String id) {
+ @PathParam("id") final String id,
+ @Parameter(
+ description = "Optional component state to perform a
selective key removal. If omitted, clears all state.",
+ required = false
+ ) final ComponentStateEntity componentStateEntity) {
if (isReplicateRequest()) {
- return replicate(HttpMethod.POST);
+ return replicate(HttpMethod.POST, componentStateEntity);
}
final ParameterProviderEntity requestParameterProviderEntity = new
ParameterProviderEntity();
@@ -480,11 +484,13 @@ public class ParameterProviderResource extends
AbstractParameterResource {
},
() -> serviceFacade.verifyCanClearParameterProviderState(id),
(parameterProviderEntity) -> {
- // get the component state
-
serviceFacade.clearParameterProviderState(parameterProviderEntity.getId());
+ // clear state
+ final ComponentStateDTO expectedState =
componentStateEntity == null ? null : componentStateEntity.getComponentState();
+ final ComponentStateDTO state =
serviceFacade.clearParameterProviderState(parameterProviderEntity.getId(),
expectedState);
// generate the response entity
final ComponentStateEntity entity = new
ComponentStateEntity();
+ entity.setComponentState(state);
// generate the response
return generateOkResponse(entity).build();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
index 9493faa923..07d8103fe0 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
@@ -233,7 +233,7 @@ public class ProcessorResource extends ApplicationResource {
@POST
- @Consumes(MediaType.APPLICATION_JSON)
+ @Consumes({MediaType.APPLICATION_JSON, MediaType.WILDCARD})
@Produces(MediaType.APPLICATION_JSON)
@Path("/run-status-details/queries")
@Operation(
@@ -489,7 +489,7 @@ public class ProcessorResource extends ApplicationResource {
* @throws InterruptedException if interrupted
*/
@POST
- @Consumes(MediaType.WILDCARD)
+ @Consumes({MediaType.APPLICATION_JSON, MediaType.WILDCARD})
@Produces(MediaType.APPLICATION_JSON)
@Path("{id}/state/clear-requests")
@Operation(
@@ -511,10 +511,14 @@ public class ProcessorResource extends
ApplicationResource {
description = "The processor id.",
required = true
)
- @PathParam("id") final String id) throws InterruptedException {
+ @PathParam("id") final String id,
+ @Parameter(
+ description = "Optional component state to perform a
selective key removal. If omitted, clears all state.",
+ required = false
+ ) final ComponentStateEntity componentStateEntity) throws
InterruptedException {
if (isReplicateRequest()) {
- return replicate(HttpMethod.POST);
+ return replicate(HttpMethod.POST, componentStateEntity);
}
final ProcessorEntity requestProcessorEntity = new ProcessorEntity();
@@ -529,11 +533,13 @@ public class ProcessorResource extends
ApplicationResource {
},
() -> serviceFacade.verifyCanClearProcessorState(id),
(processorEntity) -> {
- // get the component state
- serviceFacade.clearProcessorState(processorEntity.getId());
+ // clear state
+ final ComponentStateDTO expectedState =
componentStateEntity == null ? null : componentStateEntity.getComponentState();
+ final ComponentStateDTO state =
serviceFacade.clearProcessorState(processorEntity.getId(), expectedState);
// generate the response entity
final ComponentStateEntity entity = new
ComponentStateEntity();
+ entity.setComponentState(state);
// generate the response
return generateOkResponse(entity).build();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java
index 995c6cf748..7b98fa8a36 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java
@@ -331,7 +331,7 @@ public class ReportingTaskResource extends
ApplicationResource {
* @return a componentStateEntity
*/
@POST
- @Consumes(MediaType.WILDCARD)
+ @Consumes({MediaType.APPLICATION_JSON, MediaType.WILDCARD})
@Produces(MediaType.APPLICATION_JSON)
@Path("{id}/state/clear-requests")
@Operation(
@@ -353,10 +353,14 @@ public class ReportingTaskResource extends
ApplicationResource {
description = "The reporting task id.",
required = true
)
- @PathParam("id") final String id) {
+ @PathParam("id") final String id,
+ @Parameter(
+ description = "Optional component state to perform a
selective key removal. If omitted, clears all state.",
+ required = false
+ ) final ComponentStateEntity componentStateEntity) {
if (isReplicateRequest()) {
- return replicate(HttpMethod.POST);
+ return replicate(HttpMethod.POST, componentStateEntity);
}
final ReportingTaskEntity requestReportTaskEntity = new
ReportingTaskEntity();
@@ -371,11 +375,13 @@ public class ReportingTaskResource extends
ApplicationResource {
},
() -> serviceFacade.verifyCanClearReportingTaskState(id),
(reportingTaskEntity) -> {
- // get the component state
-
serviceFacade.clearReportingTaskState(reportingTaskEntity.getId());
+ // clear state
+ final ComponentStateDTO expectedState =
componentStateEntity == null ? null : componentStateEntity.getComponentState();
+ final ComponentStateDTO state =
serviceFacade.clearReportingTaskState(reportingTaskEntity.getId(),
expectedState);
// generate the response entity
final ComponentStateEntity entity = new
ComponentStateEntity();
+ entity.setComponentState(state);
// generate the response
return generateOkResponse(entity).build();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 6b6b66fade..51a87885e8 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -439,6 +439,7 @@ public final class DtoFactory {
final ComponentStateDTO dto = new ComponentStateDTO();
dto.setComponentId(componentId);
dto.setStateDescription(getStateDescription(componentClass));
+ dto.setDropStateKeySupported(getDropStateKeySupported(componentClass));
dto.setLocalState(createStateMapDTO(Scope.LOCAL, localState));
dto.setClusterState(createStateMapDTO(Scope.CLUSTER, clusterState));
return dto;
@@ -459,6 +460,22 @@ public final class DtoFactory {
}
}
+ /**
+ * Gets whether dropping state by key is supported for the component.
+ * Defaults to false when annotation not present or field not specified.
+ *
+ * @param componentClass the component class
+ * @return true if dropping state by key is supported; false otherwise
+ */
+ private boolean getDropStateKeySupported(final Class<?> componentClass) {
+ final Stateful statefulAnnotation =
componentClass.getAnnotation(Stateful.class);
+ boolean result = false;
+ if (statefulAnnotation != null) {
+ result = statefulAnnotation.dropStateKeySupported();
+ }
+ return result;
+ }
+
/**
* Creates a StateMapDTO for the given scope and state map.
*
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ComponentStateDAO.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ComponentStateDAO.java
index 3090a49410..90664c324b 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ComponentStateDAO.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ComponentStateDAO.java
@@ -24,6 +24,7 @@ import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
public interface ComponentStateDAO {
@@ -41,7 +42,17 @@ public interface ComponentStateDAO {
*
* @param processor processor
*/
- void clearState(ProcessorNode processor);
+ default void clearState(ProcessorNode processor) {
+ clearState(processor, null);
+ }
+
+ /**
+ * Clears the state for the specified processor.
+ *
+ * @param processor processor
+ * @param componentStateDTO component state DTO
+ */
+ void clearState(ProcessorNode processor, final ComponentStateDTO
componentStateDTO);
/**
* Gets the state map for the specified controller service.
@@ -56,8 +67,9 @@ public interface ComponentStateDAO {
* Clears the state for the specified controller service.
*
* @param controllerService controller service
+ * @param componentStateDTO component state DTO
*/
- void clearState(ControllerServiceNode controllerService);
+ void clearState(ControllerServiceNode controllerService, final
ComponentStateDTO componentStateDTO);
/**
* Gets the state for the specified reporting task.
@@ -71,9 +83,10 @@ public interface ComponentStateDAO {
/**
* Clears the state for the specified reporting task.
*
- * @param reportingTask reporting task
+ * @param reportingTask reporting task
+ * @param componentStateDTO component state DTO
*/
- void clearState(ReportingTaskNode reportingTask);
+ void clearState(ReportingTaskNode reportingTask, final ComponentStateDTO
componentStateDTO);
/**
* Gets the state for the specified flow analysis rule.
@@ -88,9 +101,10 @@ public interface ComponentStateDAO {
/**
* Clears the state for the specified flow analysis rule.
*
- * @param flowAnalysisRule flow analysis rule
+ * @param flowAnalysisRule flow analysis rule
+ * @param componentStateDTO component state DTO
*/
- void clearState(FlowAnalysisRuleNode flowAnalysisRule);
+ void clearState(FlowAnalysisRuleNode flowAnalysisRule, final
ComponentStateDTO componentStateDTO);
/**
* Gets the state for the specified parameter provider.
@@ -105,8 +119,9 @@ public interface ComponentStateDAO {
* Clears the state for the specified parameter provider.
*
* @param parameterProvider parameter provider
+ * @param componentStateDTO component state DTO
*/
- void clearState(ParameterProviderNode parameterProvider);
+ void clearState(ParameterProviderNode parameterProvider, final
ComponentStateDTO componentStateDTO);
/**
* Gets the state map for the specified RemoteProcessGroup.
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java
index 8b9f0ea607..6dc41df450 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java
@@ -22,6 +22,7 @@ import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
@@ -150,6 +151,7 @@ public interface ControllerServiceDAO {
* Clears the state of the specified controller service.
*
* @param controllerServiceId controller service id
+ * @param componentStateDTO state of the controller service
*/
- void clearState(String controllerServiceId);
+ void clearState(String controllerServiceId, ComponentStateDTO
componentStateDTO);
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/FlowAnalysisRuleDAO.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/FlowAnalysisRuleDAO.java
index e4b4d94012..ad70f3fc27 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/FlowAnalysisRuleDAO.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/FlowAnalysisRuleDAO.java
@@ -19,6 +19,7 @@ package org.apache.nifi.web.dao;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.controller.FlowAnalysisRuleNode;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
import org.apache.nifi.web.api.dto.FlowAnalysisRuleDTO;
@@ -127,6 +128,7 @@ public interface FlowAnalysisRuleDAO {
* Clears the state of the specified flow analysis rule.
*
* @param flowAnalysisRuleId flow analysis rule id
+ * @param componentStateDTO state of the flow analysis rule
*/
- void clearState(String flowAnalysisRuleId);
+ void clearState(final String flowAnalysisRuleId, final ComponentStateDTO
componentStateDTO);
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ParameterProviderDAO.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ParameterProviderDAO.java
index 7a12bf920a..20865e5ca9 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ParameterProviderDAO.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ParameterProviderDAO.java
@@ -22,6 +22,7 @@ import org.apache.nifi.controller.ParameterProviderNode;
import org.apache.nifi.controller.ParametersApplication;
import org.apache.nifi.controller.parameter.ParameterProviderLookup;
import org.apache.nifi.parameter.ParameterGroupConfiguration;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
import org.apache.nifi.web.api.dto.ParameterProviderDTO;
@@ -156,7 +157,8 @@ public interface ParameterProviderDAO extends
ParameterProviderLookup {
* Clears the state of the specified parameter provider.
*
* @param parameterProviderId parameter provider id
+ * @param componentStateDTO state of the parameter provider
*/
- void clearState(String parameterProviderId);
+ void clearState(final String parameterProviderId, final ComponentStateDTO
componentStateDTO);
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessorDAO.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessorDAO.java
index 679289f5b6..72f7caee87 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessorDAO.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessorDAO.java
@@ -19,6 +19,7 @@ package org.apache.nifi.web.dao;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
@@ -143,7 +144,8 @@ public interface ProcessorDAO {
/**
* Clears the state of the specified processor.
*
- * @param processorId processor id
+ * @param processorId processor id
+ * @param componentStateDTO state of the processor
*/
- void clearState(String processorId);
+ void clearState(final String processorId, final ComponentStateDTO
componentStateDTO);
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ReportingTaskDAO.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ReportingTaskDAO.java
index 4c06c6278b..61f4a4e597 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ReportingTaskDAO.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ReportingTaskDAO.java
@@ -19,6 +19,7 @@ package org.apache.nifi.web.dao;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
@@ -127,7 +128,8 @@ public interface ReportingTaskDAO {
/**
* Clears the state of the specified reporting task.
*
- * @param reportingTaskId reporting task id
+ * @param reportingTaskId reporting task id
+ * @param componentStateDTO state of the reporting task
*/
- void clearState(String reportingTaskId);
+ void clearState(final String reportingTaskId, final ComponentStateDTO
componentStateDTO);
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java
index f7c8ab7062..d636e7923a 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java
@@ -27,11 +27,15 @@ import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.web.ResourceNotFoundException;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
+import org.apache.nifi.web.api.dto.StateMapDTO;
import org.apache.nifi.web.dao.ComponentStateDAO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
@Repository
public class StandardComponentStateDAO implements ComponentStateDAO {
@@ -42,28 +46,105 @@ public class StandardComponentStateDAO implements
ComponentStateDAO {
try {
final StateManager manager =
stateManagerProvider.getStateManager(componentId);
if (manager == null) {
- throw new ResourceNotFoundException(String.format("State for
the specified component %s could not be found.", componentId));
+ throw new ResourceNotFoundException("State for the specified
component %s could not be found.".formatted(componentId));
}
return manager.getState(scope);
} catch (final IOException ioe) {
- throw new IllegalStateException(String.format("Unable to get the
state for the specified component %s: %s", componentId, ioe), ioe);
+ throw new IllegalStateException("Unable to get the state for the
specified component %s: %s".formatted(componentId, ioe), ioe);
}
}
- private void clearState(final String componentId) {
+ private void clearState(final String componentId, final ComponentStateDTO
componentStateDTO) {
try {
final StateManager manager =
stateManagerProvider.getStateManager(componentId);
if (manager == null) {
- throw new ResourceNotFoundException(String.format("State for
the specified component %s could not be found.", componentId));
+ throw new ResourceNotFoundException("State for the specified
component %s could not be found.".formatted(componentId));
}
- // clear both state's at the same time
+ // No DTO provided: clear both scopes
+ if (componentStateDTO == null) {
+ manager.clear(Scope.CLUSTER);
+ manager.clear(Scope.LOCAL);
+ return;
+ }
+
+ // Determine if there is existing local state
+ final StateMap localStateMap = manager.getState(Scope.LOCAL);
+ final boolean hasLocalState = localStateMap != null &&
!localStateMap.toMap().isEmpty();
+
+ if (hasLocalState) {
+ // Local state exists
+ if (!stateManagerProvider.isClusterProviderEnabled()) {
+ // Standalone mode: allow selective local key removal
+ if (!manager.isStateKeyDropSupported()) {
+ throw new IllegalStateException("Selective state key
removal is not supported for component %s with local
state.".formatted(componentId));
+ }
+
+ final Map<String, String> newLocalState =
toStateMap(componentStateDTO.getLocalState());
+ final Map<String, String> currentLocalState =
localStateMap.toMap();
+
+ if (hasExactlyOneKeyRemoved(currentLocalState,
newLocalState)) {
+ manager.setState(newLocalState, Scope.LOCAL);
+ } else {
+ throw new IllegalStateException("Unable to remove a
state key for component %s. Exactly one key removal is
supported.".formatted(componentId));
+ }
+ } else {
+ // Cluster mode with existing local state: do not allow
selective removal
+ throw new IllegalStateException("Selective state key
removal is not supported for component %s with local
state.".formatted(componentId));
+ }
+ return;
+ }
+
+ // No local state present, check for cluster state selective
removal
+ final StateMapDTO clusterStateMapDto =
componentStateDTO.getClusterState();
+ if (clusterStateMapDto != null && clusterStateMapDto.getState() !=
null && !clusterStateMapDto.getState().isEmpty()) {
+ if (!manager.isStateKeyDropSupported()) {
+ throw new IllegalStateException("Selective state key
removal is not supported for component %s with cluster
state.".formatted(componentId));
+ }
+
+ final Map<String, String> newClusterState =
toStateMap(clusterStateMapDto);
+ final Map<String, String> currentClusterState =
manager.getState(Scope.CLUSTER).toMap();
+
+ if (hasExactlyOneKeyRemoved(currentClusterState,
newClusterState)) {
+ manager.setState(newClusterState, Scope.CLUSTER);
+ } else {
+ throw new IllegalStateException("Unable to remove a state
key for component %s. Exactly one key removal is
supported.".formatted(componentId));
+ }
+
+ // Ensure local state is cleared
+ manager.clear(Scope.LOCAL);
+ return;
+ }
+
+ // Default: clear both scopes
manager.clear(Scope.CLUSTER);
manager.clear(Scope.LOCAL);
} catch (final IOException ioe) {
- throw new IllegalStateException(String.format("Unable to clear the
state for the specified component %s: %s", componentId, ioe), ioe);
+ throw new IllegalStateException("Unable to clear the state for the
specified component %s: %s".formatted(componentId, ioe), ioe);
+ }
+ }
+
+ private boolean hasExactlyOneKeyRemoved(Map<String, String> currentState,
Map<String, String> newState) {
+ // Check if newState has exactly one less key
+ if (currentState.size() - newState.size() != 1) {
+ return false;
}
+
+ // Check that newState is a subset of currentState
+ return newState.entrySet()
+ .stream()
+ .allMatch(entry ->
entry.getValue().equals(currentState.get(entry.getKey())));
+ }
+
+ private static Map<String, String> toStateMap(final StateMapDTO
stateMapDTO) {
+ final Map<String, String> map = new HashMap<>();
+ if (stateMapDTO == null || stateMapDTO.getState() == null) {
+ return map;
+ }
+
+ stateMapDTO.getState().forEach(entry -> map.put(entry.getKey(),
entry.getValue()));
+ return map;
}
@Override
@@ -72,8 +153,8 @@ public class StandardComponentStateDAO implements
ComponentStateDAO {
}
@Override
- public void clearState(final ProcessorNode processor) {
- clearState(processor.getIdentifier());
+ public void clearState(final ProcessorNode processor, final
ComponentStateDTO componentStateDTO) {
+ clearState(processor.getIdentifier(), componentStateDTO);
}
@Override
@@ -82,8 +163,8 @@ public class StandardComponentStateDAO implements
ComponentStateDAO {
}
@Override
- public void clearState(final ControllerServiceNode controllerService) {
- clearState(controllerService.getIdentifier());
+ public void clearState(final ControllerServiceNode controllerService,
final ComponentStateDTO componentStateDTO) {
+ clearState(controllerService.getIdentifier(), componentStateDTO);
}
@Override
@@ -92,8 +173,8 @@ public class StandardComponentStateDAO implements
ComponentStateDAO {
}
@Override
- public void clearState(final ReportingTaskNode reportingTask) {
- clearState(reportingTask.getIdentifier());
+ public void clearState(final ReportingTaskNode reportingTask, final
ComponentStateDTO componentStateDTO) {
+ clearState(reportingTask.getIdentifier(), componentStateDTO);
}
@Override
@@ -102,8 +183,8 @@ public class StandardComponentStateDAO implements
ComponentStateDAO {
}
@Override
- public void clearState(final FlowAnalysisRuleNode flowAnalysisRule) {
- clearState(flowAnalysisRule.getIdentifier());
+ public void clearState(final FlowAnalysisRuleNode flowAnalysisRule, final
ComponentStateDTO componentStateDTO) {
+ clearState(flowAnalysisRule.getIdentifier(), componentStateDTO);
}
@Override
@@ -112,8 +193,8 @@ public class StandardComponentStateDAO implements
ComponentStateDAO {
}
@Override
- public void clearState(final ParameterProviderNode parameterProvider) {
- clearState(parameterProvider.getIdentifier());
+ public void clearState(final ParameterProviderNode parameterProvider,
final ComponentStateDTO componentStateDTO) {
+ clearState(parameterProvider.getIdentifier(), componentStateDTO);
}
@Override
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
index 2359af9612..c89ac1b0f3 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
@@ -17,6 +17,7 @@
package org.apache.nifi.web.dao.impl;
import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
@@ -35,16 +36,16 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.logging.LogRepository;
+import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.logging.repository.NopLogRepository;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.ParameterLookup;
-import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.processor.SimpleProcessLogger;
-import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.util.BundleUtils;
import org.apache.nifi.web.NiFiCoreException;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.BundleDTO;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.dao.ComponentStateDAO;
@@ -450,9 +451,9 @@ public class StandardControllerServiceDAO extends
ComponentDAO implements Contro
}
@Override
- public void clearState(final String controllerServiceId) {
+ public void clearState(final String controllerServiceId, final
ComponentStateDTO componentStateDTO) {
final ControllerServiceNode controllerService =
locateControllerService(controllerServiceId);
- componentStateDAO.clearState(controllerService);
+ componentStateDAO.clearState(controllerService, componentStateDTO);
}
@Autowired
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowAnalysisRuleDAO.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowAnalysisRuleDAO.java
index b1bce275c2..a4d9ef051b 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowAnalysisRuleDAO.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowAnalysisRuleDAO.java
@@ -22,16 +22,16 @@ import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.FlowAnalysisRuleNode;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ReloadComponent;
-import org.apache.nifi.controller.FlowAnalysisRuleNode;
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
import org.apache.nifi.controller.exception.ValidationException;
import
org.apache.nifi.controller.flowanalysis.FlowAnalysisRuleInstantiationException;
import org.apache.nifi.controller.flowanalysis.FlowAnalysisRuleProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
-import org.apache.nifi.flowanalysis.FlowAnalysisRuleState;
import org.apache.nifi.flowanalysis.EnforcementPolicy;
+import org.apache.nifi.flowanalysis.FlowAnalysisRuleState;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogRepository;
import org.apache.nifi.logging.StandardLoggingContext;
@@ -43,6 +43,7 @@ import org.apache.nifi.util.BundleUtils;
import org.apache.nifi.web.NiFiCoreException;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.BundleDTO;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
import org.apache.nifi.web.api.dto.FlowAnalysisRuleDTO;
import org.apache.nifi.web.dao.ComponentStateDAO;
@@ -356,9 +357,9 @@ public class StandardFlowAnalysisRuleDAO extends
ComponentDAO implements FlowAna
}
@Override
- public void clearState(String flowAnalysisRuleId) {
+ public void clearState(final String flowAnalysisRuleId, final
ComponentStateDTO componentStateDTO) {
final FlowAnalysisRuleNode flowAnalysisRule =
locateFlowAnalysisRule(flowAnalysisRuleId);
- componentStateDAO.clearState(flowAnalysisRule);
+ componentStateDAO.clearState(flowAnalysisRule, componentStateDTO);
}
@Autowired
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardParameterProviderDAO.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardParameterProviderDAO.java
index f301d79ff9..958769ab31 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardParameterProviderDAO.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardParameterProviderDAO.java
@@ -31,16 +31,17 @@ import
org.apache.nifi.controller.parameter.ParameterProviderInstantiationExcept
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogRepository;
+import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.logging.repository.NopLogRepository;
import org.apache.nifi.nar.ExtensionManager;
-import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.parameter.ParameterGroupConfiguration;
+import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.processor.SimpleProcessLogger;
-import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.util.BundleUtils;
import org.apache.nifi.web.NiFiCoreException;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.BundleDTO;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
import org.apache.nifi.web.api.dto.ParameterProviderDTO;
import org.apache.nifi.web.dao.ComponentStateDAO;
@@ -298,9 +299,9 @@ public class StandardParameterProviderDAO extends
ComponentDAO implements Parame
}
@Override
- public void clearState(final String parameterProviderId) {
+ public void clearState(final String parameterProviderId, final
ComponentStateDTO componentStateDTO) {
final ParameterProviderNode parameterProvider =
locateParameterProvider(parameterProviderId);
- componentStateDAO.clearState(parameterProvider);
+ componentStateDAO.clearState(parameterProvider, componentStateDTO);
}
@Override
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
index e751e75aa1..132c864702 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
@@ -18,6 +18,7 @@ package org.apache.nifi.web.dao.impl;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
@@ -38,7 +39,6 @@ import org.apache.nifi.logging.LogRepository;
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.logging.repository.NopLogRepository;
import org.apache.nifi.nar.ExtensionManager;
-import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.SimpleProcessLogger;
@@ -50,6 +50,7 @@ import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.web.NiFiCoreException;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.BundleDTO;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
@@ -128,8 +129,9 @@ public class StandardProcessorDAO extends ComponentDAO
implements ProcessorDAO {
configureProcessor(processor, processorDTO);
// Notify the processor node that the configuration (properties,
e.g.) has been restored
+ final Class<?> componentClass = processor.getProcessor() == null ?
null : processor.getProcessor().getClass();
final StandardProcessContext processContext = new
StandardProcessContext(processor, flowController.getControllerServiceProvider(),
-
flowController.getStateManagerProvider().getStateManager(processor.getProcessor().getIdentifier()),
() -> false, flowController);
+
flowController.getStateManagerProvider().getStateManager(processor.getProcessor().getIdentifier(),
componentClass), () -> false, flowController);
processor.onConfigurationRestored(processContext);
return processor;
@@ -608,9 +610,9 @@ public class StandardProcessorDAO extends ComponentDAO
implements ProcessorDAO {
}
@Override
- public void clearState(String processorId) {
+ public void clearState(final String processorId, final ComponentStateDTO
componentStateDTO) {
final ProcessorNode processor = locateProcessor(processorId);
- componentStateDAO.clearState(processor);
+ componentStateDAO.clearState(processor, componentStateDTO);
}
@Autowired
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java
index 287946e746..c57dc456e6 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java
@@ -18,6 +18,7 @@ package org.apache.nifi.web.dao.impl;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
@@ -33,18 +34,18 @@ import
org.apache.nifi.controller.reporting.ReportingTaskProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogRepository;
+import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.logging.repository.NopLogRepository;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.ParameterLookup;
-import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.processor.SimpleProcessLogger;
-import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.BundleUtils;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.web.NiFiCoreException;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.BundleDTO;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.apache.nifi.web.dao.ComponentStateDAO;
@@ -409,9 +410,9 @@ public class StandardReportingTaskDAO extends ComponentDAO
implements ReportingT
}
@Override
- public void clearState(String reportingTaskId) {
+ public void clearState(final String reportingTaskId, final
ComponentStateDTO componentStateDTO) {
final ReportingTaskNode reportingTask =
locateReportingTask(reportingTaskId);
- componentStateDAO.clearState(reportingTask);
+ componentStateDAO.clearState(reportingTask, componentStateDTO);
}
@Autowired
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessorAuditor.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessorAuditor.java
index 1f80737109..ce253a5c6c 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessorAuditor.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessorAuditor.java
@@ -157,7 +157,7 @@ class TestProcessorAuditor {
when(extensionManager.getBundles(anyString())).thenReturn(Collections.singletonList(bundle));
-
when(mockStateManagerProvider.getStateManager(PN_ID)).thenReturn(mockStateManager);
+ when(mockStateManagerProvider.getStateManager(anyString(),
any())).thenReturn(mockStateManager);
final ProcessorNode processor = processorDao.createProcessor(GROUP_ID,
processorDto);
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingContext.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingContext.java
index 65475f7b00..f3cfe5dc68 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingContext.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingContext.java
@@ -52,7 +52,8 @@ public class StatelessReportingContext extends
AbstractReportingContext implemen
@Override
public StateManager getStateManager() {
- return
statelessEngine.getStateManagerProvider().getStateManager(getReportingTask().getIdentifier());
+ final Class<?> componentClass = getReportingTask() == null ? null :
getReportingTask().getClass();
+ return
statelessEngine.getStateManagerProvider().getStateManager(getReportingTask().getIdentifier(),
componentClass);
}
@Override
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ComponentBuilder.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ComponentBuilder.java
index 97c5af5b74..729609f309 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ComponentBuilder.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ComponentBuilder.java
@@ -283,7 +283,7 @@ public class ComponentBuilder {
final ComponentLog serviceLogger = new
SimpleProcessLogger(identifier, serviceImpl, new StandardLoggingContext(null));
final TerminationAwareLogger terminationAwareLogger = new
TerminationAwareLogger(serviceLogger);
- final StateManager stateManager =
stateManagerProvider.getStateManager(identifier);
+ final StateManager stateManager =
stateManagerProvider.getStateManager(identifier, rawClass);
final ControllerServiceInitializationContext initContext = new
StandardControllerServiceInitializationContext(identifier,
terminationAwareLogger,
serviceProvider, stateManager, kerberosConfig,
nodeTypeProvider);
serviceImpl.initialize(initContext);
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
index 8010d5cafb..09010dd7df 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
@@ -139,7 +139,7 @@ public class StatelessFlowManager extends
AbstractFlowManager implements FlowMan
@Override
public RemoteProcessGroup createRemoteProcessGroup(final String id, final
String uris) {
return new StandardRemoteProcessGroup(id, uris, null,
statelessEngine.getProcessScheduler(), statelessEngine.getBulletinRepository(),
sslContext,
- statelessEngine.getStateManagerProvider().getStateManager(id),
TimeUnit.SECONDS.toMillis(30));
+ statelessEngine.getStateManagerProvider().getStateManager(id,
StandardRemoteProcessGroup.class), TimeUnit.SECONDS.toMillis(30));
}
@Override
@@ -179,7 +179,8 @@ public class StatelessFlowManager extends
AbstractFlowManager implements FlowMan
}
try (final NarCloseable ignored =
NarCloseable.withComponentNarLoader(extensionManager,
procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) {
- final StateManager stateManager =
statelessEngine.getStateManagerProvider().getStateManager(id);
+ final Class<?> componentClass = procNode.getProcessor() ==
null ? null : procNode.getProcessor().getClass();
+ final StateManager stateManager =
statelessEngine.getStateManagerProvider().getStateManager(id, componentClass);
final StandardProcessContext processContext = new
StandardProcessContext(procNode, statelessEngine.getControllerServiceProvider(),
stateManager, () -> false, new
StatelessNodeTypeProvider());
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class,
procNode.getProcessor(), processContext);
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessProcessContextFactory.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessProcessContextFactory.java
index 8d8d1dd1c1..76cd674abd 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessProcessContextFactory.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessProcessContextFactory.java
@@ -39,7 +39,10 @@ public class StatelessProcessContextFactory implements
ProcessContextFactory {
@Override
public ProcessContext createProcessContext(final Connectable connectable) {
- final StateManager stateManager =
stateManagerProvider.getStateManager(connectable.getIdentifier());
+ final Class<?> componentClass = (connectable instanceof ProcessorNode
&& ((ProcessorNode) connectable).getProcessor() != null)
+ ? ((ProcessorNode) connectable).getProcessor().getClass()
+ : null;
+ final StateManager stateManager =
stateManagerProvider.getStateManager(connectable.getIdentifier(),
componentClass);
if (connectable instanceof ProcessorNode) {
final ProcessorNode processor = (ProcessorNode) connectable;
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessReloadComponent.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessReloadComponent.java
index 1228a4a089..60633a2fd4 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessReloadComponent.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessReloadComponent.java
@@ -87,7 +87,8 @@ public class StatelessReloadComponent implements
ReloadComponent {
// call OnRemoved for the existing processor using the previous
instance class loader
try (final NarCloseable ignored =
NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
- final StateManager stateManager =
statelessEngine.getStateManagerProvider().getStateManager(id);
+ final Class<?> componentClass = existingNode.getProcessor() ==
null ? null : existingNode.getProcessor().getClass();
+ final StateManager stateManager =
statelessEngine.getStateManagerProvider().getStateManager(id, componentClass);
final StandardProcessContext processContext = new
StandardProcessContext(existingNode,
statelessEngine.getControllerServiceProvider(),
stateManager, () -> false, new StatelessNodeTypeProvider());
@@ -109,8 +110,9 @@ public class StatelessReloadComponent implements
ReloadComponent {
existingNode.refreshProperties();
// Notify the processor node that the configuration (properties, e.g.)
has been restored
+ final Class<?> componentClass = existingNode.getProcessor() == null ?
null : existingNode.getProcessor().getClass();
final StandardProcessContext processContext = new
StandardProcessContext(existingNode,
statelessEngine.getControllerServiceProvider(),
- statelessEngine.getStateManagerProvider().getStateManager(id),
() -> false, new StatelessNodeTypeProvider());
+ statelessEngine.getStateManagerProvider().getStateManager(id,
componentClass), () -> false, new StatelessNodeTypeProvider());
existingNode.onConfigurationRestored(processContext);
logger.debug("Successfully reloaded {}", existingNode);
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContextFactory.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContextFactory.java
index 161af4033b..2a3e135d0e 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContextFactory.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContextFactory.java
@@ -20,6 +20,7 @@ package org.apache.nifi.stateless.repository;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
@@ -52,7 +53,10 @@ public class StatelessRepositoryContextFactory implements
RepositoryContextFacto
@Override
public RepositoryContext createRepositoryContext(final Connectable
connectable, final ProvenanceEventRepository provenanceEventRepository) {
- final StateManager stateManager =
stateManagerProvider.getStateManager(connectable.getIdentifier());
+ final Class<?> componentClass = (connectable instanceof ProcessorNode
&& ((ProcessorNode) connectable).getProcessor() != null)
+ ? ((ProcessorNode) connectable).getProcessor().getClass()
+ : null;
+ final StateManager stateManager =
stateManagerProvider.getStateManager(connectable.getIdentifier(),
componentClass);
return new StatelessRepositoryContext(connectable, new AtomicLong(0L),
contentRepository, flowFileRepository,
flowFileEventRepository, counterRepository,
provenanceEventRepository, stateManager);
}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateFlowFile.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateFlowFile.java
index 2dc303c56d..6e7adac274 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateFlowFile.java
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateFlowFile.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.tests.system;
+import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
@@ -51,6 +52,7 @@ import static
org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDA
import static
org.apache.nifi.processor.util.StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR;
@DefaultSchedule(period = "10 mins")
+@Stateful(scopes = { Scope.LOCAL, Scope.CLUSTER }, description = "Stores the
number of generated FlowFiles", dropStateKeySupported = true)
public class GenerateFlowFile extends AbstractProcessor {
public static final PropertyDescriptor FILE_SIZE = new Builder()
.name("File Size")
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/MultiKeyState.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/MultiKeyState.java
new file mode 100644
index 0000000000..34ea12014d
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/MultiKeyState.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+@PrimaryNodeOnly
+@DefaultSchedule(period = "10 mins")
+@Stateful(scopes = Scope.CLUSTER, description = "Stores three counters in
state", dropStateKeySupported = true)
+public class MultiKeyState extends AbstractProcessor {
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .autoTerminateDefault(true)
+ .build();
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Set.of(REL_SUCCESS);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ final Map<String, String> currentState;
+ try {
+ currentState = session.getState(Scope.CLUSTER).toMap();
+ } catch (IOException e) {
+ throw new ProcessException(e);
+ }
+
+ final int a = Integer.parseInt(currentState.getOrDefault("a", "0")) +
1;
+ final int b = Integer.parseInt(currentState.getOrDefault("b", "0")) +
1;
+ final int c = Integer.parseInt(currentState.getOrDefault("c", "0")) +
1;
+
+ final Map<String, String> newState = new HashMap<>();
+ newState.put("a", String.valueOf(a));
+ newState.put("b", String.valueOf(b));
+ newState.put("c", String.valueOf(c));
+ try {
+ session.setState(newState, Scope.CLUSTER);
+ } catch (IOException e) {
+ throw new ProcessException(e);
+ }
+
+ FlowFile flowFile = session.create();
+ session.transfer(flowFile, REL_SUCCESS);
+ }
+}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/MultiKeyStateNotDroppable.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/MultiKeyStateNotDroppable.java
new file mode 100644
index 0000000000..3acd253066
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/MultiKeyStateNotDroppable.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+@PrimaryNodeOnly
+@DefaultSchedule(period = "10 mins")
+@Stateful(scopes = Scope.CLUSTER, description = "Stores three counters in
state", dropStateKeySupported = false)
+public class MultiKeyStateNotDroppable extends AbstractProcessor {
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .autoTerminateDefault(true)
+ .build();
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Set.of(REL_SUCCESS);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ final Map<String, String> currentState;
+ try {
+ currentState = session.getState(Scope.CLUSTER).toMap();
+ } catch (IOException e) {
+ throw new ProcessException(e);
+ }
+
+ final int a = Integer.parseInt(currentState.getOrDefault("a", "0")) +
1;
+ final int b = Integer.parseInt(currentState.getOrDefault("b", "0")) +
1;
+ final int c = Integer.parseInt(currentState.getOrDefault("c", "0")) +
1;
+
+ final Map<String, String> newState = new HashMap<>();
+ newState.put("a", String.valueOf(a));
+ newState.put("b", String.valueOf(b));
+ newState.put("c", String.valueOf(c));
+ try {
+ session.setState(newState, Scope.CLUSTER);
+ } catch (IOException e) {
+ throw new ProcessException(e);
+ }
+
+ FlowFile flowFile = session.create();
+ session.transfer(flowFile, REL_SUCCESS);
+ }
+}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 9df0ad1a4d..9b6ba1cbd5 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -33,6 +33,8 @@ org.apache.nifi.processors.tests.system.HoldInput
org.apache.nifi.processors.tests.system.IngestFile
org.apache.nifi.processors.tests.system.LoopFlowFile
org.apache.nifi.processors.tests.system.MigrateProperties
+org.apache.nifi.processors.tests.system.MultiKeyState
+org.apache.nifi.processors.tests.system.MultiKeyStateNotDroppable
org.apache.nifi.processors.tests.system.PartitionText
org.apache.nifi.processors.tests.system.PassThrough
org.apache.nifi.processors.tests.system.PassThroughRequiresInstanceClassLoading
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/state/AbstractStateKeyDropIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/state/AbstractStateKeyDropIT.java
new file mode 100644
index 0000000000..78e585202e
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/state/AbstractStateKeyDropIT.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.tests.system.state;
+
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
+import org.apache.nifi.web.api.dto.StateEntryDTO;
+import org.apache.nifi.web.api.dto.StateMapDTO;
+import org.apache.nifi.web.api.entity.ComponentStateEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractStateKeyDropIT extends NiFiSystemIT {
+
+ /**
+ * Retrieves the state for a given processor.
+ *
+ * @param processorId the ID of the processor
+ * @param scope the scope of the state to retrieve (LOCAL or CLUSTER)
+ * @return a map containing the cluster state key-value pairs
+ * @throws IOException IO exception
+ * @throws NiFiClientException NiFi Client Exception
+ */
+ protected Map<String, String> getProcessorState(final String processorId,
final Scope scope) throws NiFiClientException, IOException {
+ final ComponentStateDTO componentState =
getNifiClient().getProcessorClient().getProcessorState(processorId).getComponentState();
+ final Map<String, String> state = new HashMap<>();
+
+ switch (scope) {
+ case LOCAL:
+ if (componentState != null && componentState.getLocalState()
!= null && componentState.getLocalState().getState() != null) {
+ componentState.getLocalState().getState().forEach(entry ->
state.put(entry.getKey(), entry.getValue()));
+ }
+ break;
+ case CLUSTER:
+ if (componentState != null && componentState.getClusterState()
!= null && componentState.getClusterState().getState() != null) {
+ componentState.getClusterState().getState().forEach(entry
-> state.put(entry.getKey(), entry.getValue()));
+ }
+ break;
+ }
+
+ return state;
+ }
+
+ /**
+ * Drops the state for a given processor, optionally setting a new state.
+ *
+ * @param processorId the ID of the processor
+ * @param newState a map containing the new state key-value pairs, or
null to
+ * drop the state without setting a new one
+ * @return the response from the NiFi API
+ * @throws IOException IO exception
+ * @throws NiFiClientException NiFi Client Exception
+ */
+ protected ComponentStateEntity dropProcessorState(final String
processorId, final Map<String, String> newState) throws NiFiClientException,
IOException {
+ final ComponentStateEntity entity = new ComponentStateEntity();
+
+ if (newState != null) {
+ final ComponentStateDTO stateDto = new ComponentStateDTO();
+ final StateMapDTO stateMapDTO = new StateMapDTO();
+ final List<StateEntryDTO> entries = new ArrayList<>();
+ newState.forEach((k, v) -> {
+ final StateEntryDTO entry = new StateEntryDTO();
+ entry.setKey(k);
+ entry.setValue(v);
+ entries.add(entry);
+ });
+ stateMapDTO.setState(entries);
+ stateDto.setClusterState(stateMapDTO);
+
+ entity.setComponentState(stateDto);
+ }
+
+ return
getNifiClient().getProcessorClient().clearProcessorState(processorId, entity);
+ }
+
+ /**
+ * Runs a processor once and waits for it to stop.
+ *
+ * @param processor the processor entity to run
+ * @throws NiFiClientException if there is an error with the NiFi client
+ * @throws IOException if there is an IO error
+ * @throws InterruptedException if the thread is interrupted while waiting
+ */
+ protected void runProcessorOnce(final ProcessorEntity processor) throws
NiFiClientException, IOException, InterruptedException {
+ getNifiClient().getProcessorClient().runProcessorOnce(processor);
+ getClientUtil().waitForStoppedProcessor(processor.getId());
+ }
+}
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/state/ClusterStateKeyDropIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/state/ClusterStateKeyDropIT.java
new file mode 100644
index 0000000000..76f09e0346
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/state/ClusterStateKeyDropIT.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.tests.system.state;
+
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.tests.system.NiFiInstanceFactory;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.entity.ComponentStateEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClusterStateKeyDropIT extends AbstractStateKeyDropIT {
+
+ @Override
+ public NiFiInstanceFactory getInstanceFactory() {
+ return createTwoNodeInstanceFactory();
+ }
+
+ @Test
+ public void testCannotDropStateKeyWithLocalAndClusterState() throws
NiFiClientException, IOException, InterruptedException {
+ final ProcessorEntity generate =
getClientUtil().createProcessor("GenerateFlowFile");
+ getClientUtil().updateProcessorProperties(generate,
Collections.singletonMap("State Scope", "LOCAL"));
+ final ProcessorEntity terminate =
getClientUtil().createProcessor("TerminateFlowFile");
+ getClientUtil().createConnection(generate, terminate, "success");
+
+ runProcessorOnce(generate);
+
+ getClientUtil().updateProcessorProperties(generate,
Collections.singletonMap("State Scope", "CLUSTER"));
+ runProcessorOnce(generate);
+
+ // GenerateFlowFile has both local and cluster state, so dropping
state should
+ // fail
+ assertThrows(NiFiClientException.class, () -> {
+ dropProcessorState(generate.getId(), Collections.emptyMap());
+ });
+ }
+
+ @Test
+ public void testCannotDropStateKeyIfFlagNotTrue() throws
NiFiClientException, IOException, InterruptedException {
+ final ProcessorEntity processor =
getClientUtil().createProcessor("MultiKeyStateNotDroppable");
+ final String processorId = processor.getId();
+
+
assertFalse(getNifiClient().getProcessorClient().getProcessorState(processorId).getComponentState().isDropStateKeySupported());
+
+ runProcessorOnce(processor);
+
+ final Map<String, String> currentState =
getProcessorState(processorId, Scope.CLUSTER);
+ assertEquals(Map.of("a", "1", "b", "1", "c", "1"), currentState);
+
+ // trying to remove key a
+ final Map<String, String> newState = Map.of("b", "1", "c", "1");
+
+ // MultiKeyStateNotDroppable processor has state but has
dropStateKeySupported =
+ // false so it should also fail
+ assertThrows(NiFiClientException.class, () -> {
+ dropProcessorState(processorId, newState);
+ });
+ }
+
+ @Test
+ public void testCannotDropStateKeyWithMismatchedState() throws
NiFiClientException, IOException, InterruptedException {
+ final ProcessorEntity processor =
getClientUtil().createProcessor("MultiKeyState");
+ final String processorId = processor.getId();
+
+
assertTrue(getNifiClient().getProcessorClient().getProcessorState(processorId).getComponentState().isDropStateKeySupported());
+
+ runProcessorOnce(processor);
+
+ final Map<String, String> currentState =
getProcessorState(processorId, Scope.CLUSTER);
+ assertEquals(Map.of("a", "1", "b", "1", "c", "1"), currentState);
+
+ // trying to remove key "a" but with wrong value for "b"
+ assertThrows(NiFiClientException.class, () -> {
+ dropProcessorState(processorId, Map.of("b", "2", "c", "1"));
+ });
+ }
+
+ @Test
+ public void testCannotDropMultipleStateKeys() throws NiFiClientException,
IOException, InterruptedException {
+ final ProcessorEntity processor =
getClientUtil().createProcessor("MultiKeyState");
+ final String processorId = processor.getId();
+ runProcessorOnce(processor);
+
+ final Map<String, String> currentState =
getProcessorState(processorId, Scope.CLUSTER);
+ assertEquals(Map.of("a", "1", "b", "1", "c", "1"), currentState);
+
+ // trying to remove two keys
+ assertThrows(NiFiClientException.class, () -> {
+ dropProcessorState(processorId, Map.of("c", "1"));
+ });
+ }
+
+ @Test
+ public void testCanDropSpecificStateKey() throws NiFiClientException,
IOException, InterruptedException {
+ final ProcessorEntity processor =
getClientUtil().createProcessor("MultiKeyState");
+ final String processorId = processor.getId();
+ runProcessorOnce(processor);
+
+ final Map<String, String> currentState =
getProcessorState(processorId, Scope.CLUSTER);
+ assertEquals(Map.of("a", "1", "b", "1", "c", "1"), currentState);
+
+ // trying to remove key a
+ final Map<String, String> newState = Map.of("b", "1", "c", "1");
+ final ComponentStateEntity response = dropProcessorState(processorId,
newState);
+
+ final Map<String, String> updatedState = new HashMap<>();
+
response.getComponentState().getClusterState().getState().forEach(entry ->
updatedState.put(entry.getKey(), entry.getValue()));
+ assertEquals(newState, updatedState);
+
+ final Map<String, String> state = getProcessorState(processorId,
Scope.CLUSTER);
+ assertEquals(newState, state);
+ }
+
+ @Test
+ public void testClearAllStateWithNullPayload() throws NiFiClientException,
IOException, InterruptedException {
+ final ProcessorEntity processor =
getClientUtil().createProcessor("MultiKeyState");
+ final String processorId = processor.getId();
+ runProcessorOnce(processor);
+
+ final Map<String, String> currentState =
getProcessorState(processorId, Scope.CLUSTER);
+ assertEquals(Map.of("a", "1", "b", "1", "c", "1"), currentState);
+
+ final ComponentStateEntity response = dropProcessorState(processorId,
null);
+
assertTrue(response.getComponentState().getClusterState().getState().isEmpty());
+
+ final Map<String, String> state = getProcessorState(processorId,
Scope.CLUSTER);
+ assertTrue(state.isEmpty());
+ }
+
+ @Test
+ public void testClearAllStateWithEmptyPayload() throws
NiFiClientException, IOException, InterruptedException {
+ final ProcessorEntity processor =
getClientUtil().createProcessor("MultiKeyState");
+ final String processorId = processor.getId();
+ runProcessorOnce(processor);
+
+ final Map<String, String> currentState =
getProcessorState(processorId, Scope.CLUSTER);
+ assertEquals(Map.of("a", "1", "b", "1", "c", "1"), currentState);
+
+ final ComponentStateEntity response = dropProcessorState(processorId,
Map.of());
+
assertTrue(response.getComponentState().getClusterState().getState().isEmpty());
+
+ final Map<String, String> state = getProcessorState(processorId,
Scope.CLUSTER);
+ assertTrue(state.isEmpty());
+ }
+}
\ No newline at end of file
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/state/StandaloneStateKeyDropIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/state/StandaloneStateKeyDropIT.java
new file mode 100644
index 0000000000..04d03d8da8
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/state/StandaloneStateKeyDropIT.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.tests.system.state;
+
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.entity.ComponentStateEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class StandaloneStateKeyDropIT extends AbstractStateKeyDropIT {
+
+ @Test
+ public void testDeleteStateKeyOnStandalone() throws NiFiClientException,
IOException, InterruptedException {
+ final ProcessorEntity generate =
getClientUtil().createProcessor("GenerateFlowFile");
+ getClientUtil().updateProcessorProperties(generate,
Collections.singletonMap("State Scope", "CLUSTER"));
+ final ProcessorEntity terminate =
getClientUtil().createProcessor("TerminateFlowFile");
+ getClientUtil().createConnection(generate, terminate, "success");
+
+ runProcessorOnce(generate);
+
+ // even if the processor is configured to use CLUSTER scope, it will
still have
+ // a local state because this is a standalone instance
+ final Map<String, String> state = getProcessorState(generate.getId(),
Scope.CLUSTER);
+ assertNull(state.get("count"));
+
+ final Map<String, String> localState =
getProcessorState(generate.getId(), Scope.LOCAL);
+ assertEquals("1", localState.get("count"));
+
+ // drop specific keys with LOCAL state
+ ComponentStateEntity newState = dropProcessorState(generate.getId(),
Collections.emptyMap());
+ assertNull(newState.getComponentState().getClusterState());
+
assertTrue(newState.getComponentState().getLocalState().getState().isEmpty());
+
+ final Map<String, String> after = getProcessorState(generate.getId(),
Scope.LOCAL);
+ assertTrue(after.isEmpty());
+
+ runProcessorOnce(generate);
+
+ // can drop full state
+ dropProcessorState(generate.getId(), null);
+ assertTrue(getProcessorState(generate.getId(), Scope.LOCAL).isEmpty());
+ }
+
+ @Test
+ public void testCannotDropStateKeyIfFlagNotTrue() throws
NiFiClientException, IOException, InterruptedException {
+ final ProcessorEntity multi =
getClientUtil().createProcessor("MultiKeyStateNotDroppable");
+
+
assertFalse(getNifiClient().getProcessorClient().getProcessorState(multi.getId()).getComponentState().isDropStateKeySupported());
+
+ runProcessorOnce(multi);
+
+ final Map<String, String> currentState =
getProcessorState(multi.getId(), Scope.LOCAL);
+ assertEquals(Map.of("a", "1", "b", "1", "c", "1"), currentState);
+
+ // trying to remove key a
+ final Map<String, String> newState = Map.of("b", "1", "c", "1");
+
+ // MultiKeyStateNotDroppable processor has state but has
dropStateKeySupported =
+ // false so it should also fail
+ assertThrows(NiFiClientException.class, () -> {
+ dropProcessorState(multi.getId(), newState);
+ });
+ }
+
+ @Test
+ public void testCannotDropStateKeyWithMismatchedState() throws
NiFiClientException, IOException, InterruptedException {
+ final ProcessorEntity multi =
getClientUtil().createProcessor("MultiKeyState");
+ runProcessorOnce(multi);
+
+ final Map<String, String> currentState =
getProcessorState(multi.getId(), Scope.LOCAL);
+ assertEquals(Map.of("a", "1", "b", "1", "c", "1"), currentState);
+
+ // trying to remove key "a" but with wrong value for "b"
+ assertThrows(NiFiClientException.class, () -> {
+ dropProcessorState(multi.getId(), Map.of("b", "2", "c", "1"));
+ });
+ }
+
+ @Test
+ public void testCannotDropMultipleStateKeys() throws NiFiClientException,
IOException, InterruptedException {
+ final ProcessorEntity multi =
getClientUtil().createProcessor("MultiKeyState");
+ runProcessorOnce(multi);
+
+ final Map<String, String> currentState =
getProcessorState(multi.getId(), Scope.LOCAL);
+ assertEquals(Map.of("a", "1", "b", "1", "c", "1"), currentState);
+
+ // trying to remove two keys
+ assertThrows(NiFiClientException.class, () -> {
+ dropProcessorState(multi.getId(), Map.of("c", "1"));
+ });
+ }
+}
\ No newline at end of file
diff --git
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProcessorClient.java
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProcessorClient.java
index 173b79128b..247065b313 100644
---
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProcessorClient.java
+++
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProcessorClient.java
@@ -60,7 +60,13 @@ public interface ProcessorClient {
ProcessorEntity terminateProcessor(String processorId) throws
NiFiClientException, IOException;
- ComponentStateEntity clearProcessorState(String processorId) throws
NiFiClientException, IOException;
+ default ComponentStateEntity clearProcessorState(String processorId)
throws NiFiClientException, IOException {
+ return clearProcessorState(processorId, null);
+ }
+
+ ComponentStateEntity clearProcessorState(String processorId,
ComponentStateEntity componentStateEntity) throws NiFiClientException,
IOException;
+
+ ComponentStateEntity getProcessorState(String processorId) throws
NiFiClientException, IOException;
/**
* Indicates that mutable requests should indicate that the client has
diff --git
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProcessorClient.java
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProcessorClient.java
index c930b7c992..2b09be0314 100644
---
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProcessorClient.java
+++
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProcessorClient.java
@@ -287,7 +287,7 @@ public class JerseyProcessorClient extends
AbstractJerseyClient implements Proce
}
@Override
- public ComponentStateEntity clearProcessorState(String processorId) throws
NiFiClientException, IOException {
+ public ComponentStateEntity clearProcessorState(final String processorId,
final ComponentStateEntity componentStateEntity) throws NiFiClientException,
IOException {
Objects.requireNonNull(processorId, "Processor ID required");
return executeAction("Error clearing state of the Processor", () -> {
@@ -295,7 +295,20 @@ public class JerseyProcessorClient extends
AbstractJerseyClient implements Proce
.path("/state/clear-requests")
.resolveTemplate("id", processorId);
- return getRequestBuilder(target).post(null,
ComponentStateEntity.class);
+ return
getRequestBuilder(target).post(Entity.entity(componentStateEntity,
MediaType.APPLICATION_JSON_TYPE), ComponentStateEntity.class);
+ });
+ }
+
+ @Override
+ public ComponentStateEntity getProcessorState(String processorId) throws
NiFiClientException, IOException {
+ Objects.requireNonNull(processorId, "Processor ID required");
+
+ return executeAction("Error getting state of the Processor", () -> {
+ final WebTarget target = processorTarget
+ .path("/state")
+ .resolveTemplate("id", processorId);
+
+ return getRequestBuilder(target).get(ComponentStateEntity.class);
});
}
}