This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 01c4ec6893 NIFI-14900 Added support for dropStateKeySupported Stateful 
flag (#10238)
01c4ec6893 is described below

commit 01c4ec68930ac11e91f0634a099ce5c3a276c0b8
Author: Pierre Villard <[email protected]>
AuthorDate: Fri Sep 12 06:56:10 2025 +0200

    NIFI-14900 Added support for dropStateKeySupported Stateful flag (#10238)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../state/provider/StandardStateMap.java           |   2 +-
 .../providers/zookeeper/StandardStateMap.java      |   2 +-
 .../apache/nifi/web/api/dto/ComponentStateDTO.java |  13 ++
 .../controller/state/StandardStateManager.java     |  15 +-
 .../manager/StandardStateManagerProvider.java      |  30 +++-
 .../apache/nifi/groups/StandardProcessGroup.java   |  13 +-
 .../components/state/StateManagerProvider.java     |  41 ++++-
 .../apache/nifi/controller/ExtensionBuilder.java   |  11 +-
 .../org/apache/nifi/controller/FlowController.java |   3 +-
 .../nifi/controller/StandardFlowSnippet.java       |   3 +-
 .../nifi/controller/StandardReloadComponent.java   |   3 +-
 .../nifi/controller/flow/StandardFlowManager.java  |   5 +
 .../StandardFlowAnalysisRuleContext.java           |   3 +-
 .../reporting/StandardReportingContext.java        |   3 +-
 .../scheduling/RepositoryContextFactory.java       |   6 +-
 .../scheduling/StandardProcessScheduler.java       |  18 ++-
 .../nifi/controller/tasks/ConnectableTask.java     |  11 +-
 .../TestStandardControllerServiceProvider.java     |   9 +-
 .../org/apache/nifi/web/NiFiServiceFacade.java     |  24 ++-
 .../apache/nifi/web/StandardNiFiServiceFacade.java |  25 +--
 .../apache/nifi/web/api/ControllerResource.java    |  16 +-
 .../nifi/web/api/ControllerServiceResource.java    |  16 +-
 .../nifi/web/api/ParameterProviderResource.java    |  16 +-
 .../org/apache/nifi/web/api/ProcessorResource.java |  18 ++-
 .../apache/nifi/web/api/ReportingTaskResource.java |  16 +-
 .../org/apache/nifi/web/api/dto/DtoFactory.java    |  17 +++
 .../org/apache/nifi/web/dao/ComponentStateDAO.java |  29 +++-
 .../apache/nifi/web/dao/ControllerServiceDAO.java  |   4 +-
 .../apache/nifi/web/dao/FlowAnalysisRuleDAO.java   |   4 +-
 .../apache/nifi/web/dao/ParameterProviderDAO.java  |   4 +-
 .../java/org/apache/nifi/web/dao/ProcessorDAO.java |   6 +-
 .../org/apache/nifi/web/dao/ReportingTaskDAO.java  |   6 +-
 .../web/dao/impl/StandardComponentStateDAO.java    | 113 ++++++++++++--
 .../web/dao/impl/StandardControllerServiceDAO.java |   9 +-
 .../web/dao/impl/StandardFlowAnalysisRuleDAO.java  |   9 +-
 .../web/dao/impl/StandardParameterProviderDAO.java |   9 +-
 .../nifi/web/dao/impl/StandardProcessorDAO.java    |  10 +-
 .../web/dao/impl/StandardReportingTaskDAO.java     |   9 +-
 .../apache/nifi/audit/TestProcessorAuditor.java    |   2 +-
 .../reporting/StatelessReportingContext.java       |   3 +-
 .../nifi/stateless/engine/ComponentBuilder.java    |   2 +-
 .../stateless/engine/StatelessFlowManager.java     |   5 +-
 .../engine/StatelessProcessContextFactory.java     |   5 +-
 .../stateless/engine/StatelessReloadComponent.java |   6 +-
 .../StatelessRepositoryContextFactory.java         |   6 +-
 .../processors/tests/system/GenerateFlowFile.java  |   2 +
 .../processors/tests/system/MultiKeyState.java     |  75 +++++++++
 .../tests/system/MultiKeyStateNotDroppable.java    |  75 +++++++++
 .../services/org.apache.nifi.processor.Processor   |   2 +
 .../tests/system/state/AbstractStateKeyDropIT.java | 109 +++++++++++++
 .../tests/system/state/ClusterStateKeyDropIT.java  | 169 +++++++++++++++++++++
 .../system/state/StandaloneStateKeyDropIT.java     | 117 ++++++++++++++
 .../nifi/toolkit/client/ProcessorClient.java       |   8 +-
 .../toolkit/client/impl/JerseyProcessorClient.java |  17 ++-
 54 files changed, 1022 insertions(+), 132 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java
 
b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java
index dae5ccb334..48529620b8 100644
--- 
a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java
+++ 
b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java
@@ -30,7 +30,7 @@ class StandardStateMap implements StateMap {
     private final Optional<String> version;
 
     StandardStateMap(final Map<String, String> data, final Optional<String> 
version) {
-        this.data = Collections.unmodifiableMap(data);
+        this.data = Collections.unmodifiableMap(data == null ? 
Collections.emptyMap() : data);
         this.version = version;
     }
 
diff --git 
a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-zookeeper-bundle/nifi-framework-zookeeper-state-provider/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/StandardStateMap.java
 
b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-zookeeper-bundle/nifi-framework-zookeeper-state-provider/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/StandardStateMap.java
index 560cb36340..612c4eacc7 100644
--- 
a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-zookeeper-bundle/nifi-framework-zookeeper-state-provider/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/StandardStateMap.java
+++ 
b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-zookeeper-bundle/nifi-framework-zookeeper-state-provider/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/StandardStateMap.java
@@ -31,7 +31,7 @@ class StandardStateMap implements StateMap {
     private final Optional<String> version;
 
     StandardStateMap(final Map<String, String> data, final Optional<String> 
version) {
-        this.data = Collections.unmodifiableMap(data);
+        this.data = Collections.unmodifiableMap(data == null ? 
Collections.emptyMap() : data);
         this.version = version;
     }
 
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ComponentStateDTO.java
 
b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ComponentStateDTO.java
index 37ba9d13f2..99858d378b 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ComponentStateDTO.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ComponentStateDTO.java
@@ -30,6 +30,7 @@ public class ComponentStateDTO {
     private String stateDescription;
     private StateMapDTO clusterState;
     private StateMapDTO localState;
+    private Boolean dropStateKeySupported;
 
     /**
      * @return The component identifier
@@ -82,4 +83,16 @@ public class ComponentStateDTO {
     public void setLocalState(StateMapDTO localState) {
         this.localState = localState;
     }
+
+    /**
+     * @return Whether dropping state by key is supported for this component.
+     */
+    @Schema(description = "Whether dropping state by key is supported for this 
component. Defaults to false when not specified by the component.")
+    public Boolean isDropStateKeySupported() {
+        return dropStateKeySupported;
+    }
+
+    public void setDropStateKeySupported(final Boolean dropStateKeySupported) {
+        this.dropStateKeySupported = dropStateKeySupported;
+    }
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java
index d3cf7c03df..2bd86d3489 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java
@@ -24,21 +24,29 @@ import org.apache.nifi.components.state.StateProvider;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.logging.LogRepository;
 import org.apache.nifi.logging.LogRepositoryFactory;
-import org.apache.nifi.processor.SimpleProcessLogger;
 import org.apache.nifi.logging.StandardLoggingContext;
+import org.apache.nifi.processor.SimpleProcessLogger;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.function.Supplier;
 
 public class StandardStateManager implements StateManager {
     private final StateProvider localProvider;
     private final StateProvider clusterProvider;
     private final String componentId;
+    private final Supplier<Boolean> dropStateKeySupportedSupplier;
 
     public StandardStateManager(final StateProvider localProvider, final 
StateProvider clusterProvider, final String componentId) {
+        this(localProvider, clusterProvider, componentId, () -> false);
+    }
+
+    public StandardStateManager(final StateProvider localProvider, final 
StateProvider clusterProvider,
+                                final String componentId, final 
Supplier<Boolean> dropStateKeySupportedSupplier) {
         this.localProvider = localProvider;
         this.clusterProvider = clusterProvider;
         this.componentId = componentId;
+        this.dropStateKeySupportedSupplier = dropStateKeySupportedSupplier;
     }
 
     private StateProvider getProvider(final Scope scope) {
@@ -86,6 +94,11 @@ public class StandardStateManager implements StateManager {
         getProvider(scope).clear(componentId);
     }
 
+    @Override
+    public boolean isStateKeyDropSupported() {
+        return dropStateKeySupportedSupplier.get();
+    }
+
     @Override
     public String toString() {
         return "StandardStateManager[componentId=" + componentId + "]";
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
index 6ddfd443d3..c55c761c36 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.net.ssl.SSLContext;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.nifi.attribute.expression.language.Query;
@@ -81,6 +82,7 @@ public class StandardStateManagerProvider implements 
StateManagerProvider {
     private final StateProvider localStateProvider;
     private final StateProvider clusterStateProvider;
     private final StateProvider previousClusterStateProvider;
+    private final ConcurrentMap<String, AtomicBoolean> dropStateKeySupported = 
new ConcurrentHashMap<>();
 
     public StandardStateManagerProvider(final StateProvider 
localStateProvider, final StateProvider clusterStateProvider) {
         this.localStateProvider = localStateProvider;
@@ -554,6 +556,23 @@ public class StandardStateManagerProvider implements 
StateManagerProvider {
      *
      * @return the StateManager that can be used by the component with the 
given ID, or <code>null</code> if none exists
      */
+    @Override
+    public synchronized StateManager getStateManager(final String componentId, 
final boolean dropSupportedFlag) {
+        final AtomicBoolean dropSupported = 
dropStateKeySupported.computeIfAbsent(componentId, id -> new 
AtomicBoolean(false));
+        if (dropSupportedFlag) {
+            dropSupported.set(true);
+        }
+
+        StateManager stateManager = stateManagers.get(componentId);
+        if (stateManager != null) {
+            return stateManager;
+        }
+
+        stateManager = new StandardStateManager(localStateProvider, 
clusterStateProvider, componentId, dropSupported::get);
+        stateManagers.put(componentId, stateManager);
+        return stateManager;
+    }
+
     @Override
     public synchronized StateManager getStateManager(final String componentId) 
{
         StateManager stateManager = stateManagers.get(componentId);
@@ -561,7 +580,8 @@ public class StandardStateManagerProvider implements 
StateManagerProvider {
             return stateManager;
         }
 
-        stateManager = new StandardStateManager(localStateProvider, 
clusterStateProvider, componentId);
+        final AtomicBoolean dropSupported = 
dropStateKeySupported.computeIfAbsent(componentId, id -> new 
AtomicBoolean(false));
+        stateManager = new StandardStateManager(localStateProvider, 
clusterStateProvider, componentId, dropSupported::get);
         stateManagers.put(componentId, stateManager);
         return stateManager;
     }
@@ -623,5 +643,13 @@ public class StandardStateManagerProvider implements 
StateManagerProvider {
                 logger.warn("Component with ID {} was removed from NiFi 
instance but failed to cleanup resources used to maintain its clustered state", 
componentId, e);
             }
         }
+
+        dropStateKeySupported.remove(componentId);
     }
+
+    @Override
+    public boolean isClusterProviderEnabled() {
+        return nifiProperties.isClustered() && clusterStateProvider != null && 
clusterStateProvider.isEnabled();
+    }
+
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 54436ada0d..333bb093c9 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -656,15 +656,16 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         return this.scheduler.getActiveThreadCount(statelessGroupNode) > 0;
     }
 
-    private StateManager getStateManager(final String componentId) {
-        return stateManagerProvider.getStateManager(componentId);
+    private StateManager getStateManager(final ProcessorNode processorNode) {
+        final Class<?> componentClass = processorNode.getProcessor() == null ? 
null : processorNode.getProcessor().getClass();
+        return 
stateManagerProvider.getStateManager(processorNode.getIdentifier(), 
componentClass);
     }
 
     private void shutdown(final ProcessGroup procGroup) {
         for (final ProcessorNode node : procGroup.getProcessors()) {
             try (final NarCloseable ignored = 
NarCloseable.withComponentNarLoader(extensionManager, 
node.getProcessor().getClass(), node.getIdentifier())) {
                 final StandardProcessContext processContext = new 
StandardProcessContext(node, controllerServiceProvider,
-                    getStateManager(node.getIdentifier()), () -> false, 
nodeTypeProvider);
+                    getStateManager(node), () -> false, nodeTypeProvider);
                 
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, 
node.getProcessor(), processContext);
             }
         }
@@ -1230,7 +1231,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
 
             try (final NarCloseable ignored = 
NarCloseable.withComponentNarLoader(extensionManager, 
processor.getProcessor().getClass(), processor.getIdentifier())) {
                 final StandardProcessContext processContext = new 
StandardProcessContext(processor, controllerServiceProvider,
-                    getStateManager(processor.getIdentifier()), () -> false, 
nodeTypeProvider);
+                    getStateManager(processor), () -> false, nodeTypeProvider);
                 
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, 
processor.getProcessor(), processContext);
             } catch (final Exception e) {
                 throw new ComponentLifeCycleException("Failed to invoke 
'OnRemoved' methods of processor with id " + processor.getIdentifier(), e);
@@ -3866,8 +3867,10 @@ public final class StandardProcessGroup implements 
ProcessGroup {
     }
 
     private ProcessContext createProcessContext(final ProcessorNode 
processorNode) {
+        final org.apache.nifi.processor.Processor processor = 
processorNode.getProcessor();
+        final Class<?> componentClass = processor == null ? null : 
processor.getClass();
         return new StandardProcessContext(processorNode, 
controllerServiceProvider,
-            
stateManagerProvider.getStateManager(processorNode.getIdentifier()), () -> 
false, nodeTypeProvider);
+            
stateManagerProvider.getStateManager(processorNode.getIdentifier(), 
componentClass), () -> false, nodeTypeProvider);
     }
 
     private ConfigurationContext createConfigurationContext(final 
ComponentNode component) {
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/state/StateManagerProvider.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/state/StateManagerProvider.java
index 0b1ab8d52e..5a8808f890 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/state/StateManagerProvider.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/state/StateManagerProvider.java
@@ -17,6 +17,8 @@
 
 package org.apache.nifi.components.state;
 
+import org.apache.nifi.annotation.behavior.Stateful;
+
 /**
  * <p>
  * Interface that provides a mechanism for obtaining the {@link StateManager} 
for a particular component
@@ -32,7 +34,37 @@ public interface StateManagerProvider {
      * @return the StateManager for the component with the given ID, or 
<code>null</code> if no State Manager
      *         exists for the component with the given ID
      */
-    StateManager getStateManager(String componentId);
+    default StateManager getStateManager(String componentId) {
+        return getStateManager(componentId, false);
+    }
+
+    /**
+     * Returns the StateManager for the component with the given ID with the 
capability of dropping individual
+     * state keys if supported
+     *
+     * @param componentId the id of the component for which the StateManager 
should be returned
+     * @param dropStateKeySupported whether the component supports dropping 
specific state keys
+     * @return the StateManager for the component with the given ID, or 
<code>null</code> if no State Manager
+     *         exists for the component with the given ID
+     */
+    StateManager getStateManager(String componentId, boolean 
dropStateKeySupported);
+
+    /**
+     * Returns the StateManager for the given component identifier, using the 
provided component class to
+     * determine whether dropping individual state keys is supported based on 
the {@link Stateful} annotation.
+     *
+     * @param componentId the id of the component for which the StateManager 
should be returned
+     * @param componentClass the component class if known; may be null
+     * @return the StateManager for the component with the given ID, or null 
if none exists
+     */
+    default StateManager getStateManager(final String componentId, final 
Class<?> componentClass) {
+        boolean dropSupported = false;
+        if (componentClass != null) {
+            final Stateful stateful = 
componentClass.getAnnotation(Stateful.class);
+            dropSupported = stateful != null && 
stateful.dropStateKeySupported();
+        }
+        return getStateManager(componentId, dropSupported);
+    }
 
     /**
      * Notifies the State Manager Provider that the component with the given 
ID has been removed from the NiFi instance
@@ -57,4 +89,11 @@ public interface StateManagerProvider {
      * state, even when components request a clustered provider
      */
     void disableClusterProvider();
+
+    /**
+     * Returns whether the Cluster State Provider is enabled.
+     *
+     * @return true if the Cluster State Provider is enabled, false otherwise
+     */
+    boolean isClusterProviderEnabled();
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
index 4f2d5050b0..6157486cfd 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
@@ -514,6 +514,7 @@ public class ExtensionBuilder {
        final ValidationContextFactory validationContextFactory = 
createValidationContextFactory(serviceProvider);
        final ReportingTaskNode taskNode;
        if (creationSuccessful) {
+           stateManagerProvider.getStateManager(identifier, 
reportingTask.getComponent().getClass());
            taskNode = new StandardReportingTaskNode(reportingTask, identifier, 
flowController, processScheduler,
                    validationContextFactory, reloadComponent, 
extensionManager, validationTrigger);
            
taskNode.setName(taskNode.getReportingTask().getClass().getSimpleName());
@@ -534,9 +535,10 @@ public class ExtensionBuilder {
    }
 
    private ParameterProviderNode createParameterProviderNode(final 
LoggableComponent<ParameterProvider> parameterProvider, final boolean 
creationSuccessful) {
-       final ValidationContextFactory validationContextFactory = new 
StandardValidationContextFactory(serviceProvider, ruleViolationsManager, 
flowAnalyzer);
+       final ValidationContextFactory validationContextFactory = 
createValidationContextFactory(serviceProvider);
        final ParameterProviderNode parameterProviderNode;
        if (creationSuccessful) {
+           stateManagerProvider.getStateManager(identifier, 
parameterProvider.getComponent().getClass());
            parameterProviderNode = new 
StandardParameterProviderNode(parameterProvider, identifier, flowController,
                    flowController.getControllerServiceProvider(), 
validationContextFactory, reloadComponent, extensionManager,
                    validationTrigger);
@@ -555,7 +557,7 @@ public class ExtensionBuilder {
    }
 
    private FlowRegistryClientNode createFlowRegistryClientNode(final 
LoggableComponent<FlowRegistryClient> client, final boolean creationSuccessful) 
{
-       final ValidationContextFactory validationContextFactory = new 
StandardValidationContextFactory(serviceProvider, ruleViolationsManager, 
flowAnalyzer);
+       final ValidationContextFactory validationContextFactory = 
createValidationContextFactory(serviceProvider);
        final StandardFlowRegistryClientNode clientNode;
 
        if (creationSuccessful) {
@@ -668,7 +670,7 @@ public class ExtensionBuilder {
            final ComponentLog serviceLogger = new 
SimpleProcessLogger(identifier, serviceImpl, new StandardLoggingContext(null));
            final TerminationAwareLogger terminationAwareLogger = new 
TerminationAwareLogger(serviceLogger);
 
-           final StateManager stateManager = 
stateManagerProvider.getStateManager(identifier);
+           final StateManager stateManager = 
stateManagerProvider.getStateManager(identifier, serviceImpl.getClass());
            final ControllerServiceInitializationContext initContext = new 
StandardControllerServiceInitializationContext(identifier, 
terminationAwareLogger,
                    serviceProvider, stateManager, kerberosConfig, 
nodeTypeProvider);
            serviceImpl.initialize(initContext);
@@ -834,6 +836,7 @@ public class ExtensionBuilder {
        final ValidationContextFactory validationContextFactory = 
createValidationContextFactory(serviceProvider);
        final FlowAnalysisRuleNode ruleNode;
        if (creationSuccessful) {
+           stateManagerProvider.getStateManager(identifier, 
flowAnalysisRule.getComponent().getClass());
            ruleNode = new StandardFlowAnalysisRuleNode(flowAnalysisRule, 
identifier, flowController,
                validationContextFactory, ruleViolationsManager, 
reloadComponent, extensionManager, validationTrigger);
            
ruleNode.setName(ruleNode.getFlowAnalysisRule().getClass().getSimpleName());
@@ -941,4 +944,4 @@ public class ExtensionBuilder {
            }
        }
    }
-}
\ No newline at end of file
+}
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index cc50486c09..8607578257 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -1104,8 +1104,9 @@ public class FlowController implements 
ReportingTaskProvider, FlowAnalysisRulePr
         for (final ProcessorNode procNode : 
flowManager.getRootGroup().findAllProcessors()) {
             final Processor processor = procNode.getProcessor();
             try (final NarCloseable ignored = 
NarCloseable.withComponentNarLoader(extensionManager, processor.getClass(), 
processor.getIdentifier())) {
+                final Class<?> componentClass = processor == null ? null : 
processor.getClass();
                 final StandardProcessContext processContext = new 
StandardProcessContext(procNode, controllerServiceProvider,
-                        
getStateManagerProvider().getStateManager(processor.getIdentifier()), () -> 
false, this);
+                        
getStateManagerProvider().getStateManager(processor.getIdentifier(), 
componentClass), () -> false, this);
                 
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class,
 processor, processContext);
             }
         }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java
index 2c0a82da56..fff602fd68 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java
@@ -426,8 +426,9 @@ public class StandardFlowSnippet implements FlowSnippet {
                 }
 
                 // Notify the processor node that the configuration 
(properties, e.g.) has been restored
+                final Class<?> componentClass = procNode.getProcessor() == 
null ? null : procNode.getProcessor().getClass();
                 final StandardProcessContext processContext = new 
StandardProcessContext(procNode, flowController.getControllerServiceProvider(),
-                        
flowController.getStateManagerProvider().getStateManager(procNode.getProcessor().getIdentifier()),
 () -> false, flowController);
+                        
flowController.getStateManagerProvider().getStateManager(procNode.getProcessor().getIdentifier(),
 componentClass), () -> false, flowController);
                 procNode.onConfigurationRestored(processContext);
             } finally {
                 procNode.resumeValidationTrigger();
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
index 5ed1829232..cec3a9ade2 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardReloadComponent.java
@@ -76,7 +76,8 @@ public class StandardReloadComponent implements 
ReloadComponent {
         // save the instance class loader to use it for calling OnRemoved on 
the existing processor
         final ClassLoader existingInstanceClassLoader = 
extensionManager.getInstanceClassLoader(id);
 
-        final StateManager stateManager = 
flowController.getStateManagerProvider().getStateManager(id);
+        final Class<?> componentClass = existingNode.getProcessor() == null ? 
null : existingNode.getProcessor().getClass();
+        final StateManager stateManager = 
flowController.getStateManagerProvider().getStateManager(id, componentClass);
         final StandardProcessContext processContext = new 
StandardProcessContext(existingNode, 
flowController.getControllerServiceProvider(),
             stateManager, () -> false, flowController);
 
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
index bdbce0d905..d592b42e0a 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
@@ -91,6 +91,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.net.ssl.SSLContext;
+
 import java.lang.reflect.InvocationTargetException;
 import java.net.URL;
 import java.util.ArrayList;
@@ -360,6 +361,7 @@ public class StandardFlowManager extends 
AbstractFlowManager implements FlowMana
             .reloadComponent(flowController.getReloadComponent())
             .addClasspathUrls(additionalUrls)
             
.kerberosConfig(flowController.createKerberosConfig(nifiProperties))
+            .stateManagerProvider(flowController.getStateManagerProvider())
             .extensionManager(extensionManager)
             .flowAnalyzer(getFlowAnalyzer().orElse(null))
             .ruleViolationsManager(getRuleViolationsManager().orElse(null))
@@ -498,6 +500,7 @@ public class StandardFlowManager extends 
AbstractFlowManager implements FlowMana
             .reloadComponent(flowController.getReloadComponent())
             .addClasspathUrls(additionalUrls)
             
.kerberosConfig(flowController.createKerberosConfig(nifiProperties))
+            .stateManagerProvider(flowController.getStateManagerProvider())
             .flowController(flowController)
             .extensionManager(extensionManager)
             .classloaderIsolationKey(classloaderIsolationKey)
@@ -561,6 +564,7 @@ public class StandardFlowManager extends 
AbstractFlowManager implements FlowMana
             .reloadComponent(flowController.getReloadComponent())
             .addClasspathUrls(additionalUrls)
             
.kerberosConfig(flowController.createKerberosConfig(nifiProperties))
+            .stateManagerProvider(flowController.getStateManagerProvider())
             .flowController(flowController)
             .extensionManager(extensionManager)
             .flowAnalyzer(getFlowAnalyzer().orElse(null))
@@ -619,6 +623,7 @@ public class StandardFlowManager extends 
AbstractFlowManager implements FlowMana
                 .reloadComponent(flowController.getReloadComponent())
                 .addClasspathUrls(additionalUrls)
                 
.kerberosConfig(flowController.createKerberosConfig(nifiProperties))
+                .stateManagerProvider(flowController.getStateManagerProvider())
                 .flowController(flowController)
                 .extensionManager(extensionManager)
                 .buildParameterProvider();
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flowanalysis/StandardFlowAnalysisRuleContext.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flowanalysis/StandardFlowAnalysisRuleContext.java
index a78920b7b3..cd4e037854 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flowanalysis/StandardFlowAnalysisRuleContext.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flowanalysis/StandardFlowAnalysisRuleContext.java
@@ -50,7 +50,8 @@ public class StandardFlowAnalysisRuleContext extends 
AbstractFlowAnalysisRuleCon
 
     @Override
     public StateManager getStateManager() {
-        return 
flowController.getStateManagerProvider().getStateManager(getFlowAnalysisRule().getIdentifier());
+        final Class<?> componentClass = getFlowAnalysisRule() == null ? null : 
getFlowAnalysisRule().getClass();
+        return 
flowController.getStateManagerProvider().getStateManager(getFlowAnalysisRule().getIdentifier(),
 componentClass);
     }
 
     @Override
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
index 645b5ee289..a20715a65a 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
@@ -51,7 +51,8 @@ public class StandardReportingContext extends 
AbstractReportingContext implement
 
     @Override
     public StateManager getStateManager() {
-        return 
flowController.getStateManagerProvider().getStateManager(getReportingTask().getIdentifier());
+        final Class<?> componentClass = getReportingTask() == null ? null : 
getReportingTask().getClass();
+        return 
flowController.getStateManagerProvider().getStateManager(getReportingTask().getIdentifier(),
 componentClass);
     }
 
     @Override
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/RepositoryContextFactory.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/RepositoryContextFactory.java
index e2594fc905..a640ca72e5 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/RepositoryContextFactory.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/RepositoryContextFactory.java
@@ -19,6 +19,7 @@ package org.apache.nifi.controller.scheduling;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateManagerProvider;
 import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.repository.ContentRepository;
 import org.apache.nifi.controller.repository.CounterRepository;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
@@ -54,7 +55,10 @@ public class RepositoryContextFactory {
     }
 
     public RepositoryContext newProcessContext(final Connectable connectable, 
final AtomicLong connectionIndex) {
-        final StateManager stateManager = 
stateManagerProvider.getStateManager(connectable.getIdentifier());
+        final Class<?> componentClass = (connectable instanceof ProcessorNode 
&& ((ProcessorNode) connectable).getProcessor() != null)
+                ? ((ProcessorNode) connectable).getProcessor().getClass()
+                : null;
+        final StateManager stateManager = 
stateManagerProvider.getStateManager(connectable.getIdentifier(), 
componentClass);
         return new StandardRepositoryContext(connectable, connectionIndex, 
contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo, 
stateManager, maxAppendableClaimBytes);
     }
 
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index f7f156c350..789ca85a96 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -126,6 +126,15 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
         return stateManagerProvider.getStateManager(componentId);
     }
 
+    private StateManager getStateManager(final String componentId, final 
Class<?> componentClass) {
+        return stateManagerProvider.getStateManager(componentId, 
componentClass);
+    }
+
+    private StateManager getStateManager(final ProcessorNode procNode) {
+        final Class<?> componentClass = procNode.getProcessor() == null ? null 
: procNode.getProcessor().getClass();
+        return getStateManager(procNode.getIdentifier(), componentClass);
+    }
+
     public void scheduleFrameworkTask(final Runnable task, final String 
taskName, final long initialDelay, final long delay, final TimeUnit timeUnit) {
         Thread.ofVirtual()
             .name(taskName)
@@ -382,7 +391,7 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
         final LifecycleState lifecycleState = 
getLifecycleState(requireNonNull(procNode), true, false);
 
         final Supplier<ProcessContext> processContextFactory = () -> new 
StandardProcessContext(procNode, getControllerServiceProvider(),
-            getStateManager(procNode.getIdentifier()), 
lifecycleState::isTerminated, flowController);
+            getStateManager(procNode), lifecycleState::isTerminated, 
flowController);
 
         final boolean scheduleActions = 
procNode.getProcessGroup().resolveExecutionEngine() != 
ExecutionEngine.STATELESS;
 
@@ -515,7 +524,7 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
         final LifecycleState lifecycleState = 
getLifecycleState(requireNonNull(procNode), true, false);
 
         final Supplier<ProcessContext> processContextFactory = () -> new 
StandardProcessContext(procNode, getControllerServiceProvider(),
-            getStateManager(procNode.getIdentifier()), 
lifecycleState::isTerminated, flowController);
+            getStateManager(procNode), lifecycleState::isTerminated, 
flowController);
 
         final CompletableFuture<Void> future = new CompletableFuture<>();
         final SchedulingAgentCallback callback = new SchedulingAgentCallback() 
{
@@ -556,7 +565,7 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
         final LifecycleState lifecycleState = getLifecycleState(procNode, 
false, false);
 
         final StandardProcessContext processContext = new 
StandardProcessContext(procNode, getControllerServiceProvider(),
-            getStateManager(procNode.getIdentifier()), 
lifecycleState::isTerminated, flowController);
+            getStateManager(procNode), lifecycleState::isTerminated, 
flowController);
 
         LOG.info("Stopping {}", procNode);
         return procNode.stop(this, this.componentLifeCycleThreadPool, 
processContext, getSchedulingAgent(procNode), lifecycleState, lifecycleMethods);
@@ -732,7 +741,8 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
         getSchedulingAgent(connectable).unschedule(connectable, state);
 
         if (!state.isScheduled() && state.getActiveThreadCount() == 0 && 
state.mustCallOnStoppedMethods()) {
-            final ConnectableProcessContext processContext = new 
ConnectableProcessContext(connectable, 
getStateManager(connectable.getIdentifier()));
+            final StateManager stateManager = (connectable instanceof 
ProcessorNode) ? getStateManager((ProcessorNode) connectable) : 
getStateManager(connectable.getIdentifier());
+            final ConnectableProcessContext processContext = new 
ConnectableProcessContext(connectable, stateManager);
             try (final NarCloseable ignored = 
NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), 
connectable.getClass(), connectable.getIdentifier())) {
                 
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, 
connectable, processContext);
             }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
index 2453415ec1..434c22a2cb 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
@@ -44,6 +44,7 @@ import org.apache.nifi.logging.StandardLoggingContext;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.SimpleProcessLogger;
 import org.apache.nifi.processor.StandardProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
@@ -82,7 +83,15 @@ public class ConnectableTask {
         this.numRelationships = connectable.getRelationships().size();
         this.flowController = flowController;
 
-        final StateManager stateManager = new 
TaskTerminationAwareStateManager(flowController.getStateManagerProvider().getStateManager(connectable.getIdentifier()),
 lifecycleState::isTerminated);
+        final StateManager baseStateManager;
+        if (connectable instanceof ProcessorNode processorNode) {
+            final Processor processor = processorNode.getProcessor();
+            final Class<?> componentClass = processor == null ? null : 
processor.getClass();
+            baseStateManager = 
flowController.getStateManagerProvider().getStateManager(connectable.getIdentifier(),
 componentClass);
+        } else {
+            baseStateManager = 
flowController.getStateManagerProvider().getStateManager(connectable.getIdentifier());
+        }
+        final StateManager stateManager = new 
TaskTerminationAwareStateManager(baseStateManager, 
lifecycleState::isTerminated);
         if (connectable instanceof ProcessorNode) {
             processContext = new StandardProcessContext(
                     (ProcessorNode) connectable, 
flowController.getControllerServiceProvider(), stateManager, 
lifecycleState::isTerminated, flowController);
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
index c284c70d97..8e5b6497a9 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
@@ -55,8 +55,8 @@ import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.MockProcessorInitializationContext;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.SynchronousValidationTrigger;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.mockito.Mockito;
@@ -84,7 +84,7 @@ public class TestStandardControllerServiceProvider {
 
     private static StateManagerProvider stateManagerProvider = new 
StateManagerProvider() {
         @Override
-        public StateManager getStateManager(final String componentId) {
+        public StateManager getStateManager(final String componentId, final 
boolean dropStateKeySupported) {
             final StateManager stateManager = Mockito.mock(StateManager.class);
             final StateMap emptyStateMap = new 
StandardStateMap(Collections.emptyMap(), Optional.empty());
             try {
@@ -111,6 +111,11 @@ public class TestStandardControllerServiceProvider {
         @Override
         public void onComponentRemoved(final String componentId) {
         }
+
+        @Override
+        public boolean isClusterProviderEnabled() {
+            return false;
+        }
     };
 
     private static NiFiProperties niFiProperties;
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 480e7383f1..9218dba029 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -1872,9 +1872,11 @@ public interface NiFiServiceFacade {
     /**
      * Clears the state for the specified processor.
      *
-     * @param processorId the processor id
+     * @param processorId       processor id
+     * @param componentStateDTO state of the processor
+     * @return the cleared component state
      */
-    void clearProcessorState(String processorId);
+    ComponentStateDTO clearProcessorState(final String processorId, final 
ComponentStateDTO componentStateDTO);
 
     /**
      * Gets the state for the specified controller service.
@@ -1895,8 +1897,10 @@ public interface NiFiServiceFacade {
      * Clears the state for the specified controller service.
      *
      * @param controllerServiceId the controller service id
+     * @param componentStateDTO   state of the controller service
+     * @return the cleared component state
      */
-    void clearControllerServiceState(String controllerServiceId);
+    ComponentStateDTO clearControllerServiceState(String controllerServiceId, 
final ComponentStateDTO componentStateDTO);
 
     /**
      * Gets the state for the specified reporting task.
@@ -1916,9 +1920,11 @@ public interface NiFiServiceFacade {
     /**
      * Clears the state for the specified reporting task.
      *
-     * @param reportingTaskId the reporting task id
+     * @param reportingTaskId   the reporting task id
+     * @param componentStateDTO the component state of the reporting task
+     * @return the cleared component state
      */
-    void clearReportingTaskState(String reportingTaskId);
+    ComponentStateDTO clearReportingTaskState(String reportingTaskId, final 
ComponentStateDTO componentStateDTO);
 
     /**
      * Gets the state for the specified parameter provider.
@@ -1939,8 +1945,10 @@ public interface NiFiServiceFacade {
      * Clears the state for the specified parameter provider.
      *
      * @param parameterProviderId the parameter provider id
+     * @param componentStateDTO   the component state of the parameter provider
+     * @return the cleared component state
      */
-    void clearParameterProviderState(String parameterProviderId);
+    ComponentStateDTO clearParameterProviderState(String parameterProviderId, 
final ComponentStateDTO componentStateDTO);
 
     /**
      * Gets the state for the specified RemoteProcessGroup.
@@ -2937,8 +2945,10 @@ public interface NiFiServiceFacade {
      * Clears the state for the flow analysis rule with the specified id.
      *
      * @param flowAnalysisRuleId the flow analysis rule id
+     * @param componentStateDTO  the state of the flow analysis rule
+     * @return the cleared component state
      */
-    void clearFlowAnalysisRuleState(String flowAnalysisRuleId);
+    ComponentStateDTO clearFlowAnalysisRuleState(String flowAnalysisRuleId, 
final ComponentStateDTO componentStateDTO);
 
     /**
      * Updates the specified flow analysis rule.
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 16f83e1f9b..7129508775 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -875,8 +875,9 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     }
 
     @Override
-    public void clearFlowAnalysisRuleState(String flowAnalysisRuleId) {
-        flowAnalysisRuleDAO.clearState(flowAnalysisRuleId);
+    public ComponentStateDTO clearFlowAnalysisRuleState(final String 
flowAnalysisRuleId, final ComponentStateDTO componentStateDTO) {
+        flowAnalysisRuleDAO.clearState(flowAnalysisRuleId, componentStateDTO);
+        return this.getFlowAnalysisRuleState(flowAnalysisRuleId);
     }
 
     @Override
@@ -2003,8 +2004,9 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     }
 
     @Override
-    public void clearProcessorState(final String processorId) {
-        processorDAO.clearState(processorId);
+    public ComponentStateDTO clearProcessorState(final String processorId, 
final ComponentStateDTO componentStateDTO) {
+        processorDAO.clearState(processorId, componentStateDTO);
+        return this.getProcessorState(processorId);
     }
 
     @Override
@@ -2013,8 +2015,9 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     }
 
     @Override
-    public void clearControllerServiceState(final String controllerServiceId) {
-        controllerServiceDAO.clearState(controllerServiceId);
+    public ComponentStateDTO clearControllerServiceState(final String 
controllerServiceId, final ComponentStateDTO componentStateDTO) {
+        controllerServiceDAO.clearState(controllerServiceId, 
componentStateDTO);
+        return this.getControllerServiceState(controllerServiceId);
     }
 
     @Override
@@ -2023,8 +2026,9 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     }
 
     @Override
-    public void clearReportingTaskState(final String reportingTaskId) {
-        reportingTaskDAO.clearState(reportingTaskId);
+    public ComponentStateDTO clearReportingTaskState(final String 
reportingTaskId, final ComponentStateDTO componentStateDTO) {
+        reportingTaskDAO.clearState(reportingTaskId, componentStateDTO);
+        return this.getReportingTaskState(reportingTaskId);
     }
 
     @Override
@@ -2033,8 +2037,9 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     }
 
     @Override
-    public void clearParameterProviderState(final String parameterProviderId) {
-        parameterProviderDAO.clearState(parameterProviderId);
+    public ComponentStateDTO clearParameterProviderState(final String 
parameterProviderId, final ComponentStateDTO componentStateDTO) {
+        parameterProviderDAO.clearState(parameterProviderId, 
componentStateDTO);
+        return this.getParameterProviderState(parameterProviderId);
     }
 
     @Override
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
index f2afba9242..f507fbb5ad 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
@@ -624,7 +624,7 @@ public class ControllerResource extends ApplicationResource 
{
      * @return a componentStateEntity
      */
     @POST
-    @Consumes(MediaType.WILDCARD)
+    @Consumes({MediaType.APPLICATION_JSON, MediaType.WILDCARD})
     @Produces(MediaType.APPLICATION_JSON)
     @Path("flow-analysis-rules/{id}/state/clear-requests")
     @Operation(
@@ -646,10 +646,14 @@ public class ControllerResource extends 
ApplicationResource {
                     description = "The flow analysis rule id.",
                     required = true
             )
-            @PathParam("id") final String id) {
+            @PathParam("id") final String id,
+            @Parameter(
+                    description = "Optional component state to perform a 
selective key removal. If omitted, clears all state.",
+                    required = false
+            ) final ComponentStateEntity componentStateEntity) {
 
         if (isReplicateRequest()) {
-            return replicate(HttpMethod.POST);
+            return replicate(HttpMethod.POST, componentStateEntity);
         }
 
         final FlowAnalysisRuleEntity requestFlowAnalysisRuleEntity = new 
FlowAnalysisRuleEntity();
@@ -661,11 +665,13 @@ public class ControllerResource extends 
ApplicationResource {
                 lookup -> authorizeController(RequestAction.WRITE),
                 () -> serviceFacade.verifyCanClearFlowAnalysisRuleState(id),
                 (flowAnalysisRuleEntity) -> {
-                    // get the component state
-                    
serviceFacade.clearFlowAnalysisRuleState(flowAnalysisRuleEntity.getId());
+                    // clear state
+                    final ComponentStateDTO expectedState = 
componentStateEntity == null ? null : componentStateEntity.getComponentState();
+                    final ComponentStateDTO state = 
serviceFacade.clearFlowAnalysisRuleState(flowAnalysisRuleEntity.getId(), 
expectedState);
 
                     // generate the response entity
                     final ComponentStateEntity entity = new 
ComponentStateEntity();
+                    entity.setComponentState(state);
 
                     // generate the response
                     return generateOkResponse(entity).build();
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
index 904f6054f7..3e6fd20347 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
@@ -346,7 +346,7 @@ public class ControllerServiceResource extends 
ApplicationResource {
      * @return a componentStateEntity
      */
     @POST
-    @Consumes(MediaType.WILDCARD)
+    @Consumes({MediaType.APPLICATION_JSON, MediaType.WILDCARD})
     @Produces(MediaType.APPLICATION_JSON)
     @Path("{id}/state/clear-requests")
     @Operation(
@@ -368,10 +368,14 @@ public class ControllerServiceResource extends 
ApplicationResource {
                     description = "The controller service id.",
                     required = true
             )
-            @PathParam("id") final String id) {
+            @PathParam("id") final String id,
+            @Parameter(
+                    description = "Optional component state to perform a 
selective key removal. If omitted, clears all state.",
+                    required = false
+            ) final ComponentStateEntity componentStateEntity) {
 
         if (isReplicateRequest()) {
-            return replicate(HttpMethod.POST);
+            return replicate(HttpMethod.POST, componentStateEntity);
         }
 
         final ControllerServiceEntity requestControllerServiceEntity = new 
ControllerServiceEntity();
@@ -386,11 +390,13 @@ public class ControllerServiceResource extends 
ApplicationResource {
                 },
                 () -> serviceFacade.verifyCanClearControllerServiceState(id),
                 (controllerServiceEntity) -> {
-                    // get the component state
-                    
serviceFacade.clearControllerServiceState(controllerServiceEntity.getId());
+                    // clear state
+                    final ComponentStateDTO expectedState = 
componentStateEntity == null ? null : componentStateEntity.getComponentState();
+                    final ComponentStateDTO state = 
serviceFacade.clearControllerServiceState(controllerServiceEntity.getId(), 
expectedState);
 
                     // generate the response entity
                     final ComponentStateEntity entity = new 
ComponentStateEntity();
+                    entity.setComponentState(state);
 
                     // generate the response
                     return generateOkResponse(entity).build();
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterProviderResource.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterProviderResource.java
index 5e91fef656..da78992a9b 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterProviderResource.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterProviderResource.java
@@ -440,7 +440,7 @@ public class ParameterProviderResource extends 
AbstractParameterResource {
      * @return a componentStateEntity
      */
     @POST
-    @Consumes(MediaType.WILDCARD)
+    @Consumes({MediaType.APPLICATION_JSON, MediaType.WILDCARD})
     @Produces(MediaType.APPLICATION_JSON)
     @Path("{id}/state/clear-requests")
     @Operation(
@@ -462,10 +462,14 @@ public class ParameterProviderResource extends 
AbstractParameterResource {
                     description = "The parameter provider id.",
                     required = true
             )
-            @PathParam("id") final String id) {
+            @PathParam("id") final String id,
+            @Parameter(
+                    description = "Optional component state to perform a 
selective key removal. If omitted, clears all state.",
+                    required = false
+            ) final ComponentStateEntity componentStateEntity) {
 
         if (isReplicateRequest()) {
-            return replicate(HttpMethod.POST);
+            return replicate(HttpMethod.POST, componentStateEntity);
         }
 
         final ParameterProviderEntity requestParameterProviderEntity = new 
ParameterProviderEntity();
@@ -480,11 +484,13 @@ public class ParameterProviderResource extends 
AbstractParameterResource {
                 },
                 () -> serviceFacade.verifyCanClearParameterProviderState(id),
                 (parameterProviderEntity) -> {
-                    // get the component state
-                    
serviceFacade.clearParameterProviderState(parameterProviderEntity.getId());
+                    // clear state
+                    final ComponentStateDTO expectedState = 
componentStateEntity == null ? null : componentStateEntity.getComponentState();
+                    final ComponentStateDTO state = 
serviceFacade.clearParameterProviderState(parameterProviderEntity.getId(), 
expectedState);
 
                     // generate the response entity
                     final ComponentStateEntity entity = new 
ComponentStateEntity();
+                    entity.setComponentState(state);
 
                     // generate the response
                     return generateOkResponse(entity).build();
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
index 9493faa923..07d8103fe0 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
@@ -233,7 +233,7 @@ public class ProcessorResource extends ApplicationResource {
 
 
     @POST
-    @Consumes(MediaType.APPLICATION_JSON)
+    @Consumes({MediaType.APPLICATION_JSON, MediaType.WILDCARD})
     @Produces(MediaType.APPLICATION_JSON)
     @Path("/run-status-details/queries")
     @Operation(
@@ -489,7 +489,7 @@ public class ProcessorResource extends ApplicationResource {
      * @throws InterruptedException if interrupted
      */
     @POST
-    @Consumes(MediaType.WILDCARD)
+    @Consumes({MediaType.APPLICATION_JSON, MediaType.WILDCARD})
     @Produces(MediaType.APPLICATION_JSON)
     @Path("{id}/state/clear-requests")
     @Operation(
@@ -511,10 +511,14 @@ public class ProcessorResource extends 
ApplicationResource {
                     description = "The processor id.",
                     required = true
             )
-            @PathParam("id") final String id) throws InterruptedException {
+            @PathParam("id") final String id,
+            @Parameter(
+                    description = "Optional component state to perform a 
selective key removal. If omitted, clears all state.",
+                    required = false
+            ) final ComponentStateEntity componentStateEntity) throws 
InterruptedException {
 
         if (isReplicateRequest()) {
-            return replicate(HttpMethod.POST);
+            return replicate(HttpMethod.POST, componentStateEntity);
         }
 
         final ProcessorEntity requestProcessorEntity = new ProcessorEntity();
@@ -529,11 +533,13 @@ public class ProcessorResource extends 
ApplicationResource {
                 },
                 () -> serviceFacade.verifyCanClearProcessorState(id),
                 (processorEntity) -> {
-                    // get the component state
-                    serviceFacade.clearProcessorState(processorEntity.getId());
+                    // clear state
+                    final ComponentStateDTO expectedState = 
componentStateEntity == null ? null : componentStateEntity.getComponentState();
+                    final ComponentStateDTO state = 
serviceFacade.clearProcessorState(processorEntity.getId(), expectedState);
 
                     // generate the response entity
                     final ComponentStateEntity entity = new 
ComponentStateEntity();
+                    entity.setComponentState(state);
 
                     // generate the response
                     return generateOkResponse(entity).build();
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java
index 995c6cf748..7b98fa8a36 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java
@@ -331,7 +331,7 @@ public class ReportingTaskResource extends 
ApplicationResource {
      * @return a componentStateEntity
      */
     @POST
-    @Consumes(MediaType.WILDCARD)
+    @Consumes({MediaType.APPLICATION_JSON, MediaType.WILDCARD})
     @Produces(MediaType.APPLICATION_JSON)
     @Path("{id}/state/clear-requests")
     @Operation(
@@ -353,10 +353,14 @@ public class ReportingTaskResource extends 
ApplicationResource {
                     description = "The reporting task id.",
                     required = true
             )
-            @PathParam("id") final String id) {
+            @PathParam("id") final String id,
+            @Parameter(
+                    description = "Optional component state to perform a 
selective key removal. If omitted, clears all state.",
+                    required = false
+            ) final ComponentStateEntity componentStateEntity) {
 
         if (isReplicateRequest()) {
-            return replicate(HttpMethod.POST);
+            return replicate(HttpMethod.POST, componentStateEntity);
         }
 
         final ReportingTaskEntity requestReportTaskEntity = new 
ReportingTaskEntity();
@@ -371,11 +375,13 @@ public class ReportingTaskResource extends 
ApplicationResource {
                 },
                 () -> serviceFacade.verifyCanClearReportingTaskState(id),
                 (reportingTaskEntity) -> {
-                    // get the component state
-                    
serviceFacade.clearReportingTaskState(reportingTaskEntity.getId());
+                    // clear state
+                    final ComponentStateDTO expectedState = 
componentStateEntity == null ? null : componentStateEntity.getComponentState();
+                    final ComponentStateDTO state = 
serviceFacade.clearReportingTaskState(reportingTaskEntity.getId(), 
expectedState);
 
                     // generate the response entity
                     final ComponentStateEntity entity = new 
ComponentStateEntity();
+                    entity.setComponentState(state);
 
                     // generate the response
                     return generateOkResponse(entity).build();
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 6b6b66fade..51a87885e8 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -439,6 +439,7 @@ public final class DtoFactory {
        final ComponentStateDTO dto = new ComponentStateDTO();
        dto.setComponentId(componentId);
        dto.setStateDescription(getStateDescription(componentClass));
+       dto.setDropStateKeySupported(getDropStateKeySupported(componentClass));
        dto.setLocalState(createStateMapDTO(Scope.LOCAL, localState));
        dto.setClusterState(createStateMapDTO(Scope.CLUSTER, clusterState));
        return dto;
@@ -459,6 +460,22 @@ public final class DtoFactory {
        }
    }
 
+   /**
+    * Gets whether dropping state by key is supported for the component.
+    * Defaults to false when annotation not present or field not specified.
+    *
+    * @param componentClass the component class
+    * @return true if dropping state by key is supported; false otherwise
+    */
+   private boolean getDropStateKeySupported(final Class<?> componentClass) {
+       final Stateful statefulAnnotation = 
componentClass.getAnnotation(Stateful.class);
+       boolean result = false;
+       if (statefulAnnotation != null) {
+           result = statefulAnnotation.dropStateKeySupported();
+       }
+       return result;
+   }
+
    /**
     * Creates a StateMapDTO for the given scope and state map.
     *
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ComponentStateDAO.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ComponentStateDAO.java
index 3090a49410..90664c324b 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ComponentStateDAO.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ComponentStateDAO.java
@@ -24,6 +24,7 @@ import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
 
 public interface ComponentStateDAO {
 
@@ -41,7 +42,17 @@ public interface ComponentStateDAO {
      *
      * @param processor processor
      */
-    void clearState(ProcessorNode processor);
+    default void clearState(ProcessorNode processor) {
+        clearState(processor, null);
+    }
+
+    /**
+     * Clears the state for the specified processor.
+     *
+     * @param processor         processor
+     * @param componentStateDTO component state DTO
+     */
+    void clearState(ProcessorNode processor, final ComponentStateDTO 
componentStateDTO);
 
     /**
      * Gets the state map for the specified controller service.
@@ -56,8 +67,9 @@ public interface ComponentStateDAO {
      * Clears the state for the specified controller service.
      *
      * @param controllerService controller service
+     * @param componentStateDTO component state DTO
      */
-    void clearState(ControllerServiceNode controllerService);
+    void clearState(ControllerServiceNode controllerService, final 
ComponentStateDTO componentStateDTO);
 
     /**
      * Gets the state for the specified reporting task.
@@ -71,9 +83,10 @@ public interface ComponentStateDAO {
     /**
      * Clears the state for the specified reporting task.
      *
-     * @param reportingTask reporting task
+     * @param reportingTask     reporting task
+     * @param componentStateDTO component state DTO
      */
-    void clearState(ReportingTaskNode reportingTask);
+    void clearState(ReportingTaskNode reportingTask, final ComponentStateDTO 
componentStateDTO);
 
     /**
      * Gets the state for the specified flow analysis rule.
@@ -88,9 +101,10 @@ public interface ComponentStateDAO {
     /**
      * Clears the state for the specified flow analysis rule.
      *
-     * @param flowAnalysisRule flow analysis rule
+     * @param flowAnalysisRule  flow analysis rule
+     * @param componentStateDTO component state DTO
      */
-    void clearState(FlowAnalysisRuleNode flowAnalysisRule);
+    void clearState(FlowAnalysisRuleNode flowAnalysisRule, final 
ComponentStateDTO componentStateDTO);
 
     /**
      * Gets the state for the specified parameter provider.
@@ -105,8 +119,9 @@ public interface ComponentStateDAO {
      * Clears the state for the specified parameter provider.
      *
      * @param parameterProvider parameter provider
+     * @param componentStateDTO component state DTO
      */
-    void clearState(ParameterProviderNode parameterProvider);
+    void clearState(ParameterProviderNode parameterProvider, final 
ComponentStateDTO componentStateDTO);
 
     /**
      * Gets the state map for the specified RemoteProcessGroup.
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java
index 8b9f0ea607..6dc41df450 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java
@@ -22,6 +22,7 @@ import org.apache.nifi.controller.ComponentNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
 import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
 import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 
@@ -150,6 +151,7 @@ public interface ControllerServiceDAO {
      * Clears the state of the specified controller service.
      *
      * @param controllerServiceId controller service id
+     * @param componentStateDTO   state of the controller service
      */
-    void clearState(String controllerServiceId);
+    void clearState(String controllerServiceId, ComponentStateDTO 
componentStateDTO);
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/FlowAnalysisRuleDAO.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/FlowAnalysisRuleDAO.java
index e4b4d94012..ad70f3fc27 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/FlowAnalysisRuleDAO.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/FlowAnalysisRuleDAO.java
@@ -19,6 +19,7 @@ package org.apache.nifi.web.dao;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.controller.FlowAnalysisRuleNode;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
 import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
 import org.apache.nifi.web.api.dto.FlowAnalysisRuleDTO;
 
@@ -127,6 +128,7 @@ public interface FlowAnalysisRuleDAO {
      * Clears the state of the specified flow analysis rule.
      *
      * @param flowAnalysisRuleId flow analysis rule id
+     * @param componentStateDTO  state of the flow analysis rule
      */
-    void clearState(String flowAnalysisRuleId);
+    void clearState(final String flowAnalysisRuleId, final ComponentStateDTO 
componentStateDTO);
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ParameterProviderDAO.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ParameterProviderDAO.java
index 7a12bf920a..20865e5ca9 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ParameterProviderDAO.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ParameterProviderDAO.java
@@ -22,6 +22,7 @@ import org.apache.nifi.controller.ParameterProviderNode;
 import org.apache.nifi.controller.ParametersApplication;
 import org.apache.nifi.controller.parameter.ParameterProviderLookup;
 import org.apache.nifi.parameter.ParameterGroupConfiguration;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
 import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
 import org.apache.nifi.web.api.dto.ParameterProviderDTO;
 
@@ -156,7 +157,8 @@ public interface ParameterProviderDAO extends 
ParameterProviderLookup {
      * Clears the state of the specified parameter provider.
      *
      * @param parameterProviderId parameter provider id
+     * @param componentStateDTO   state of the parameter provider
      */
-    void clearState(String parameterProviderId);
+    void clearState(final String parameterProviderId, final ComponentStateDTO 
componentStateDTO);
 
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessorDAO.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessorDAO.java
index 679289f5b6..72f7caee87 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessorDAO.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessorDAO.java
@@ -19,6 +19,7 @@ package org.apache.nifi.web.dao;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
 import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
 import org.apache.nifi.web.api.dto.ProcessorDTO;
 
@@ -143,7 +144,8 @@ public interface ProcessorDAO {
     /**
      * Clears the state of the specified processor.
      *
-     * @param processorId processor id
+     * @param processorId       processor id
+     * @param componentStateDTO state of the processor
      */
-    void clearState(String processorId);
+    void clearState(final String processorId, final ComponentStateDTO 
componentStateDTO);
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ReportingTaskDAO.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ReportingTaskDAO.java
index 4c06c6278b..61f4a4e597 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ReportingTaskDAO.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ReportingTaskDAO.java
@@ -19,6 +19,7 @@ package org.apache.nifi.web.dao;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
 import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
 import org.apache.nifi.web.api.dto.ReportingTaskDTO;
 
@@ -127,7 +128,8 @@ public interface ReportingTaskDAO {
     /**
      * Clears the state of the specified reporting task.
      *
-     * @param reportingTaskId reporting task id
+     * @param reportingTaskId   reporting task id
+     * @param componentStateDTO state of the reporting task
      */
-    void clearState(String reportingTaskId);
+    void clearState(final String reportingTaskId, final ComponentStateDTO 
componentStateDTO);
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java
index f7c8ab7062..d636e7923a 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java
@@ -27,11 +27,15 @@ import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.web.ResourceNotFoundException;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
+import org.apache.nifi.web.api.dto.StateMapDTO;
 import org.apache.nifi.web.dao.ComponentStateDAO;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Repository;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 @Repository
 public class StandardComponentStateDAO implements ComponentStateDAO {
@@ -42,28 +46,105 @@ public class StandardComponentStateDAO implements 
ComponentStateDAO {
         try {
             final StateManager manager = 
stateManagerProvider.getStateManager(componentId);
             if (manager == null) {
-                throw new ResourceNotFoundException(String.format("State for 
the specified component %s could not be found.", componentId));
+                throw new ResourceNotFoundException("State for the specified 
component %s could not be found.".formatted(componentId));
             }
 
             return manager.getState(scope);
         } catch (final IOException ioe) {
-            throw new IllegalStateException(String.format("Unable to get the 
state for the specified component %s: %s", componentId, ioe), ioe);
+            throw new IllegalStateException("Unable to get the state for the 
specified component %s: %s".formatted(componentId, ioe), ioe);
         }
     }
 
-    private void clearState(final String componentId) {
+    private void clearState(final String componentId, final ComponentStateDTO 
componentStateDTO) {
         try {
             final StateManager manager = 
stateManagerProvider.getStateManager(componentId);
             if (manager == null) {
-                throw new ResourceNotFoundException(String.format("State for 
the specified component %s could not be found.", componentId));
+                throw new ResourceNotFoundException("State for the specified 
component %s could not be found.".formatted(componentId));
             }
 
-            // clear both state's at the same time
+            // No DTO provided: clear both scopes
+            if (componentStateDTO == null) {
+                manager.clear(Scope.CLUSTER);
+                manager.clear(Scope.LOCAL);
+                return;
+            }
+
+            // Determine if there is existing local state
+            final StateMap localStateMap = manager.getState(Scope.LOCAL);
+            final boolean hasLocalState = localStateMap != null && 
!localStateMap.toMap().isEmpty();
+
+            if (hasLocalState) {
+                // Local state exists
+                if (!stateManagerProvider.isClusterProviderEnabled()) {
+                    // Standalone mode: allow selective local key removal
+                    if (!manager.isStateKeyDropSupported()) {
+                        throw new IllegalStateException("Selective state key 
removal is not supported for component %s with local 
state.".formatted(componentId));
+                    }
+
+                    final Map<String, String> newLocalState = 
toStateMap(componentStateDTO.getLocalState());
+                    final Map<String, String> currentLocalState = 
localStateMap.toMap();
+
+                    if (hasExactlyOneKeyRemoved(currentLocalState, 
newLocalState)) {
+                        manager.setState(newLocalState, Scope.LOCAL);
+                    } else {
+                        throw new IllegalStateException("Unable to remove a 
state key for component %s. Exactly one key removal is 
supported.".formatted(componentId));
+                    }
+                } else {
+                    // Cluster mode with existing local state: do not allow 
selective removal
+                    throw new IllegalStateException("Selective state key 
removal is not supported for component %s with local 
state.".formatted(componentId));
+                }
+                return;
+            }
+
+            // No local state present, check for cluster state selective 
removal
+            final StateMapDTO clusterStateMapDto = 
componentStateDTO.getClusterState();
+            if (clusterStateMapDto != null && clusterStateMapDto.getState() != 
null && !clusterStateMapDto.getState().isEmpty()) {
+                if (!manager.isStateKeyDropSupported()) {
+                    throw new IllegalStateException("Selective state key 
removal is not supported for component %s with cluster 
state.".formatted(componentId));
+                }
+
+                final Map<String, String> newClusterState = 
toStateMap(clusterStateMapDto);
+                final Map<String, String> currentClusterState = 
manager.getState(Scope.CLUSTER).toMap();
+
+                if (hasExactlyOneKeyRemoved(currentClusterState, 
newClusterState)) {
+                    manager.setState(newClusterState, Scope.CLUSTER);
+                } else {
+                    throw new IllegalStateException("Unable to remove a state 
key for component %s. Exactly one key removal is 
supported.".formatted(componentId));
+                }
+
+                // Ensure local state is cleared
+                manager.clear(Scope.LOCAL);
+                return;
+            }
+
+            // Default: clear both scopes
             manager.clear(Scope.CLUSTER);
             manager.clear(Scope.LOCAL);
         } catch (final IOException ioe) {
-            throw new IllegalStateException(String.format("Unable to clear the 
state for the specified component %s: %s", componentId, ioe), ioe);
+            throw new IllegalStateException("Unable to clear the state for the 
specified component %s: %s".formatted(componentId, ioe), ioe);
+        }
+    }
+
+    private boolean hasExactlyOneKeyRemoved(Map<String, String> currentState, 
Map<String, String> newState) {
+        // Check if newState has exactly one less key
+        if (currentState.size() - newState.size() != 1) {
+            return false;
         }
+
+        // Check that newState is a subset of currentState
+        return newState.entrySet()
+                .stream()
+                .allMatch(entry -> 
entry.getValue().equals(currentState.get(entry.getKey())));
+    }
+
+    private static Map<String, String> toStateMap(final StateMapDTO 
stateMapDTO) {
+        final Map<String, String> map = new HashMap<>();
+        if (stateMapDTO == null || stateMapDTO.getState() == null) {
+            return map;
+        }
+
+        stateMapDTO.getState().forEach(entry -> map.put(entry.getKey(), 
entry.getValue()));
+        return map;
     }
 
     @Override
@@ -72,8 +153,8 @@ public class StandardComponentStateDAO implements 
ComponentStateDAO {
     }
 
     @Override
-    public void clearState(final ProcessorNode processor) {
-        clearState(processor.getIdentifier());
+    public void clearState(final ProcessorNode processor, final 
ComponentStateDTO componentStateDTO) {
+        clearState(processor.getIdentifier(), componentStateDTO);
     }
 
     @Override
@@ -82,8 +163,8 @@ public class StandardComponentStateDAO implements 
ComponentStateDAO {
     }
 
     @Override
-    public void clearState(final ControllerServiceNode controllerService) {
-        clearState(controllerService.getIdentifier());
+    public void clearState(final ControllerServiceNode controllerService, 
final ComponentStateDTO componentStateDTO) {
+        clearState(controllerService.getIdentifier(), componentStateDTO);
     }
 
     @Override
@@ -92,8 +173,8 @@ public class StandardComponentStateDAO implements 
ComponentStateDAO {
     }
 
     @Override
-    public void clearState(final ReportingTaskNode reportingTask) {
-        clearState(reportingTask.getIdentifier());
+    public void clearState(final ReportingTaskNode reportingTask, final 
ComponentStateDTO componentStateDTO) {
+        clearState(reportingTask.getIdentifier(), componentStateDTO);
     }
 
     @Override
@@ -102,8 +183,8 @@ public class StandardComponentStateDAO implements 
ComponentStateDAO {
     }
 
     @Override
-    public void clearState(final FlowAnalysisRuleNode flowAnalysisRule) {
-        clearState(flowAnalysisRule.getIdentifier());
+    public void clearState(final FlowAnalysisRuleNode flowAnalysisRule, final 
ComponentStateDTO componentStateDTO) {
+        clearState(flowAnalysisRule.getIdentifier(), componentStateDTO);
     }
 
     @Override
@@ -112,8 +193,8 @@ public class StandardComponentStateDAO implements 
ComponentStateDAO {
     }
 
     @Override
-    public void clearState(final ParameterProviderNode parameterProvider) {
-        clearState(parameterProvider.getIdentifier());
+    public void clearState(final ParameterProviderNode parameterProvider, 
final ComponentStateDTO componentStateDTO) {
+        clearState(parameterProvider.getIdentifier(), componentStateDTO);
     }
 
     @Override
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
index 2359af9612..c89ac1b0f3 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.web.dao.impl;
 
 import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
@@ -35,16 +36,16 @@ import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.logging.LogLevel;
 import org.apache.nifi.logging.LogRepository;
+import org.apache.nifi.logging.StandardLoggingContext;
 import org.apache.nifi.logging.repository.NopLogRepository;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.parameter.ParameterLookup;
-import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.processor.SimpleProcessLogger;
-import org.apache.nifi.logging.StandardLoggingContext;
 import org.apache.nifi.util.BundleUtils;
 import org.apache.nifi.web.NiFiCoreException;
 import org.apache.nifi.web.ResourceNotFoundException;
 import org.apache.nifi.web.api.dto.BundleDTO;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
 import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
 import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.apache.nifi.web.dao.ComponentStateDAO;
@@ -450,9 +451,9 @@ public class StandardControllerServiceDAO extends 
ComponentDAO implements Contro
     }
 
     @Override
-    public void clearState(final String controllerServiceId) {
+    public void clearState(final String controllerServiceId, final 
ComponentStateDTO componentStateDTO) {
         final ControllerServiceNode controllerService = 
locateControllerService(controllerServiceId);
-        componentStateDAO.clearState(controllerService);
+        componentStateDAO.clearState(controllerService, componentStateDTO);
     }
 
     @Autowired
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowAnalysisRuleDAO.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowAnalysisRuleDAO.java
index b1bce275c2..a4d9ef051b 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowAnalysisRuleDAO.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowAnalysisRuleDAO.java
@@ -22,16 +22,16 @@ import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.FlowAnalysisRuleNode;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.ReloadComponent;
-import org.apache.nifi.controller.FlowAnalysisRuleNode;
 import org.apache.nifi.controller.exception.ComponentLifeCycleException;
 import org.apache.nifi.controller.exception.ValidationException;
 import 
org.apache.nifi.controller.flowanalysis.FlowAnalysisRuleInstantiationException;
 import org.apache.nifi.controller.flowanalysis.FlowAnalysisRuleProvider;
 import org.apache.nifi.controller.service.StandardConfigurationContext;
-import org.apache.nifi.flowanalysis.FlowAnalysisRuleState;
 import org.apache.nifi.flowanalysis.EnforcementPolicy;
+import org.apache.nifi.flowanalysis.FlowAnalysisRuleState;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.logging.LogRepository;
 import org.apache.nifi.logging.StandardLoggingContext;
@@ -43,6 +43,7 @@ import org.apache.nifi.util.BundleUtils;
 import org.apache.nifi.web.NiFiCoreException;
 import org.apache.nifi.web.ResourceNotFoundException;
 import org.apache.nifi.web.api.dto.BundleDTO;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
 import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
 import org.apache.nifi.web.api.dto.FlowAnalysisRuleDTO;
 import org.apache.nifi.web.dao.ComponentStateDAO;
@@ -356,9 +357,9 @@ public class StandardFlowAnalysisRuleDAO extends 
ComponentDAO implements FlowAna
     }
 
     @Override
-    public void clearState(String flowAnalysisRuleId) {
+    public void clearState(final String flowAnalysisRuleId, final 
ComponentStateDTO componentStateDTO) {
         final FlowAnalysisRuleNode flowAnalysisRule = 
locateFlowAnalysisRule(flowAnalysisRuleId);
-        componentStateDAO.clearState(flowAnalysisRule);
+        componentStateDAO.clearState(flowAnalysisRule, componentStateDTO);
     }
 
     @Autowired
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardParameterProviderDAO.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardParameterProviderDAO.java
index f301d79ff9..958769ab31 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardParameterProviderDAO.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardParameterProviderDAO.java
@@ -31,16 +31,17 @@ import 
org.apache.nifi.controller.parameter.ParameterProviderInstantiationExcept
 import org.apache.nifi.controller.service.StandardConfigurationContext;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.logging.LogRepository;
+import org.apache.nifi.logging.StandardLoggingContext;
 import org.apache.nifi.logging.repository.NopLogRepository;
 import org.apache.nifi.nar.ExtensionManager;
-import org.apache.nifi.parameter.ParameterLookup;
 import org.apache.nifi.parameter.ParameterGroupConfiguration;
+import org.apache.nifi.parameter.ParameterLookup;
 import org.apache.nifi.processor.SimpleProcessLogger;
-import org.apache.nifi.logging.StandardLoggingContext;
 import org.apache.nifi.util.BundleUtils;
 import org.apache.nifi.web.NiFiCoreException;
 import org.apache.nifi.web.ResourceNotFoundException;
 import org.apache.nifi.web.api.dto.BundleDTO;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
 import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
 import org.apache.nifi.web.api.dto.ParameterProviderDTO;
 import org.apache.nifi.web.dao.ComponentStateDAO;
@@ -298,9 +299,9 @@ public class StandardParameterProviderDAO extends 
ComponentDAO implements Parame
     }
 
     @Override
-    public void clearState(final String parameterProviderId) {
+    public void clearState(final String parameterProviderId, final 
ComponentStateDTO componentStateDTO) {
         final ParameterProviderNode parameterProvider = 
locateParameterProvider(parameterProviderId);
-        componentStateDAO.clearState(parameterProvider);
+        componentStateDAO.clearState(parameterProvider, componentStateDTO);
     }
 
     @Override
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
index e751e75aa1..132c864702 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
@@ -18,6 +18,7 @@ package org.apache.nifi.web.dao.impl;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
@@ -38,7 +39,6 @@ import org.apache.nifi.logging.LogRepository;
 import org.apache.nifi.logging.StandardLoggingContext;
 import org.apache.nifi.logging.repository.NopLogRepository;
 import org.apache.nifi.nar.ExtensionManager;
-import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.SimpleProcessLogger;
@@ -50,6 +50,7 @@ import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.web.NiFiCoreException;
 import org.apache.nifi.web.ResourceNotFoundException;
 import org.apache.nifi.web.api.dto.BundleDTO;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
 import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
 import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
 import org.apache.nifi.web.api.dto.ProcessorDTO;
@@ -128,8 +129,9 @@ public class StandardProcessorDAO extends ComponentDAO 
implements ProcessorDAO {
             configureProcessor(processor, processorDTO);
 
             // Notify the processor node that the configuration (properties, 
e.g.) has been restored
+            final Class<?> componentClass = processor.getProcessor() == null ? 
null : processor.getProcessor().getClass();
             final StandardProcessContext processContext = new 
StandardProcessContext(processor, flowController.getControllerServiceProvider(),
-                    
flowController.getStateManagerProvider().getStateManager(processor.getProcessor().getIdentifier()),
 () -> false, flowController);
+                    
flowController.getStateManagerProvider().getStateManager(processor.getProcessor().getIdentifier(),
 componentClass), () -> false, flowController);
             processor.onConfigurationRestored(processContext);
 
             return processor;
@@ -608,9 +610,9 @@ public class StandardProcessorDAO extends ComponentDAO 
implements ProcessorDAO {
     }
 
     @Override
-    public void clearState(String processorId) {
+    public void clearState(final String processorId, final ComponentStateDTO 
componentStateDTO) {
         final ProcessorNode processor = locateProcessor(processorId);
-        componentStateDAO.clearState(processor);
+        componentStateDAO.clearState(processor, componentStateDTO);
     }
 
     @Autowired
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java
index 287946e746..c57dc456e6 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java
@@ -18,6 +18,7 @@ package org.apache.nifi.web.dao.impl;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
@@ -33,18 +34,18 @@ import 
org.apache.nifi.controller.reporting.ReportingTaskProvider;
 import org.apache.nifi.controller.service.StandardConfigurationContext;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.logging.LogRepository;
+import org.apache.nifi.logging.StandardLoggingContext;
 import org.apache.nifi.logging.repository.NopLogRepository;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.parameter.ParameterLookup;
-import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.processor.SimpleProcessLogger;
-import org.apache.nifi.logging.StandardLoggingContext;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.util.BundleUtils;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.web.NiFiCoreException;
 import org.apache.nifi.web.ResourceNotFoundException;
 import org.apache.nifi.web.api.dto.BundleDTO;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
 import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
 import org.apache.nifi.web.api.dto.ReportingTaskDTO;
 import org.apache.nifi.web.dao.ComponentStateDAO;
@@ -409,9 +410,9 @@ public class StandardReportingTaskDAO extends ComponentDAO 
implements ReportingT
     }
 
     @Override
-    public void clearState(String reportingTaskId) {
+    public void clearState(final String reportingTaskId, final 
ComponentStateDTO componentStateDTO) {
         final ReportingTaskNode reportingTask = 
locateReportingTask(reportingTaskId);
-        componentStateDAO.clearState(reportingTask);
+        componentStateDAO.clearState(reportingTask, componentStateDTO);
     }
 
     @Autowired
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessorAuditor.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessorAuditor.java
index 1f80737109..ce253a5c6c 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessorAuditor.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessorAuditor.java
@@ -157,7 +157,7 @@ class TestProcessorAuditor {
         
when(extensionManager.getBundles(anyString())).thenReturn(Collections.singletonList(bundle));
 
 
-        
when(mockStateManagerProvider.getStateManager(PN_ID)).thenReturn(mockStateManager);
+        when(mockStateManagerProvider.getStateManager(anyString(), 
any())).thenReturn(mockStateManager);
 
         final ProcessorNode processor = processorDao.createProcessor(GROUP_ID, 
processorDto);
 
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingContext.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingContext.java
index 65475f7b00..f3cfe5dc68 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingContext.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingContext.java
@@ -52,7 +52,8 @@ public class StatelessReportingContext extends 
AbstractReportingContext implemen
 
     @Override
     public StateManager getStateManager() {
-        return 
statelessEngine.getStateManagerProvider().getStateManager(getReportingTask().getIdentifier());
+        final Class<?> componentClass = getReportingTask() == null ? null : 
getReportingTask().getClass();
+        return 
statelessEngine.getStateManagerProvider().getStateManager(getReportingTask().getIdentifier(),
 componentClass);
     }
 
     @Override
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ComponentBuilder.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ComponentBuilder.java
index 97c5af5b74..729609f309 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ComponentBuilder.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ComponentBuilder.java
@@ -283,7 +283,7 @@ public class ComponentBuilder {
             final ComponentLog serviceLogger = new 
SimpleProcessLogger(identifier, serviceImpl, new StandardLoggingContext(null));
             final TerminationAwareLogger terminationAwareLogger = new 
TerminationAwareLogger(serviceLogger);
 
-            final StateManager stateManager = 
stateManagerProvider.getStateManager(identifier);
+            final StateManager stateManager = 
stateManagerProvider.getStateManager(identifier, rawClass);
             final ControllerServiceInitializationContext initContext = new 
StandardControllerServiceInitializationContext(identifier, 
terminationAwareLogger,
                 serviceProvider, stateManager, kerberosConfig, 
nodeTypeProvider);
             serviceImpl.initialize(initContext);
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
index 8010d5cafb..09010dd7df 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
@@ -139,7 +139,7 @@ public class StatelessFlowManager extends 
AbstractFlowManager implements FlowMan
     @Override
     public RemoteProcessGroup createRemoteProcessGroup(final String id, final 
String uris) {
         return new StandardRemoteProcessGroup(id, uris, null, 
statelessEngine.getProcessScheduler(), statelessEngine.getBulletinRepository(), 
sslContext,
-            statelessEngine.getStateManagerProvider().getStateManager(id), 
TimeUnit.SECONDS.toMillis(30));
+            statelessEngine.getStateManagerProvider().getStateManager(id, 
StandardRemoteProcessGroup.class), TimeUnit.SECONDS.toMillis(30));
     }
 
     @Override
@@ -179,7 +179,8 @@ public class StatelessFlowManager extends 
AbstractFlowManager implements FlowMan
             }
 
             try (final NarCloseable ignored = 
NarCloseable.withComponentNarLoader(extensionManager, 
procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) {
-                final StateManager stateManager = 
statelessEngine.getStateManagerProvider().getStateManager(id);
+                final Class<?> componentClass = procNode.getProcessor() == 
null ? null : procNode.getProcessor().getClass();
+                final StateManager stateManager = 
statelessEngine.getStateManagerProvider().getStateManager(id, componentClass);
                 final StandardProcessContext processContext = new 
StandardProcessContext(procNode, statelessEngine.getControllerServiceProvider(),
                         stateManager, () -> false, new 
StatelessNodeTypeProvider());
                 
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class,
 procNode.getProcessor(), processContext);
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessProcessContextFactory.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessProcessContextFactory.java
index 8d8d1dd1c1..76cd674abd 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessProcessContextFactory.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessProcessContextFactory.java
@@ -39,7 +39,10 @@ public class StatelessProcessContextFactory implements 
ProcessContextFactory {
 
     @Override
     public ProcessContext createProcessContext(final Connectable connectable) {
-        final StateManager stateManager = 
stateManagerProvider.getStateManager(connectable.getIdentifier());
+        final Class<?> componentClass = (connectable instanceof ProcessorNode 
&& ((ProcessorNode) connectable).getProcessor() != null)
+            ? ((ProcessorNode) connectable).getProcessor().getClass()
+            : null;
+        final StateManager stateManager = 
stateManagerProvider.getStateManager(connectable.getIdentifier(), 
componentClass);
 
         if (connectable instanceof ProcessorNode) {
             final ProcessorNode processor = (ProcessorNode) connectable;
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessReloadComponent.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessReloadComponent.java
index 1228a4a089..60633a2fd4 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessReloadComponent.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessReloadComponent.java
@@ -87,7 +87,8 @@ public class StatelessReloadComponent implements 
ReloadComponent {
 
         // call OnRemoved for the existing processor using the previous 
instance class loader
         try (final NarCloseable ignored = 
NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
-            final StateManager stateManager = 
statelessEngine.getStateManagerProvider().getStateManager(id);
+            final Class<?> componentClass = existingNode.getProcessor() == 
null ? null : existingNode.getProcessor().getClass();
+            final StateManager stateManager = 
statelessEngine.getStateManagerProvider().getStateManager(id, componentClass);
             final StandardProcessContext processContext = new 
StandardProcessContext(existingNode, 
statelessEngine.getControllerServiceProvider(),
                 stateManager, () -> false, new StatelessNodeTypeProvider());
 
@@ -109,8 +110,9 @@ public class StatelessReloadComponent implements 
ReloadComponent {
         existingNode.refreshProperties();
 
         // Notify the processor node that the configuration (properties, e.g.) 
has been restored
+        final Class<?> componentClass = existingNode.getProcessor() == null ? 
null : existingNode.getProcessor().getClass();
         final StandardProcessContext processContext = new 
StandardProcessContext(existingNode, 
statelessEngine.getControllerServiceProvider(),
-                statelessEngine.getStateManagerProvider().getStateManager(id), 
() -> false, new StatelessNodeTypeProvider());
+                statelessEngine.getStateManagerProvider().getStateManager(id, 
componentClass), () -> false, new StatelessNodeTypeProvider());
         existingNode.onConfigurationRestored(processContext);
 
         logger.debug("Successfully reloaded {}", existingNode);
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContextFactory.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContextFactory.java
index 161af4033b..2a3e135d0e 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContextFactory.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContextFactory.java
@@ -20,6 +20,7 @@ package org.apache.nifi.stateless.repository;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateManagerProvider;
 import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.repository.ContentRepository;
 import org.apache.nifi.controller.repository.CounterRepository;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
@@ -52,7 +53,10 @@ public class StatelessRepositoryContextFactory implements 
RepositoryContextFacto
 
     @Override
     public RepositoryContext createRepositoryContext(final Connectable 
connectable, final ProvenanceEventRepository provenanceEventRepository) {
-        final StateManager stateManager = 
stateManagerProvider.getStateManager(connectable.getIdentifier());
+        final Class<?> componentClass = (connectable instanceof ProcessorNode 
&& ((ProcessorNode) connectable).getProcessor() != null)
+                ? ((ProcessorNode) connectable).getProcessor().getClass()
+                : null;
+        final StateManager stateManager = 
stateManagerProvider.getStateManager(connectable.getIdentifier(), 
componentClass);
         return new StatelessRepositoryContext(connectable, new AtomicLong(0L), 
contentRepository, flowFileRepository,
             flowFileEventRepository, counterRepository, 
provenanceEventRepository, stateManager);
     }
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateFlowFile.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateFlowFile.java
index 2dc303c56d..6e7adac274 100644
--- 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateFlowFile.java
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateFlowFile.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors.tests.system;
 
+import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.configuration.DefaultSchedule;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -51,6 +52,7 @@ import static 
org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDA
 import static 
org.apache.nifi.processor.util.StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR;
 
 @DefaultSchedule(period = "10 mins")
+@Stateful(scopes = { Scope.LOCAL, Scope.CLUSTER }, description = "Stores the 
number of generated FlowFiles", dropStateKeySupported = true)
 public class GenerateFlowFile extends AbstractProcessor {
     public static final PropertyDescriptor FILE_SIZE = new Builder()
         .name("File Size")
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/MultiKeyState.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/MultiKeyState.java
new file mode 100644
index 0000000000..34ea12014d
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/MultiKeyState.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+@PrimaryNodeOnly
+@DefaultSchedule(period = "10 mins")
+@Stateful(scopes = Scope.CLUSTER, description = "Stores three counters in 
state", dropStateKeySupported = true)
+public class MultiKeyState extends AbstractProcessor {
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .autoTerminateDefault(true)
+            .build();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return Set.of(REL_SUCCESS);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final Map<String, String> currentState;
+        try {
+            currentState = session.getState(Scope.CLUSTER).toMap();
+        } catch (IOException e) {
+            throw new ProcessException(e);
+        }
+
+        final int a = Integer.parseInt(currentState.getOrDefault("a", "0")) + 
1;
+        final int b = Integer.parseInt(currentState.getOrDefault("b", "0")) + 
1;
+        final int c = Integer.parseInt(currentState.getOrDefault("c", "0")) + 
1;
+
+        final Map<String, String> newState = new HashMap<>();
+        newState.put("a", String.valueOf(a));
+        newState.put("b", String.valueOf(b));
+        newState.put("c", String.valueOf(c));
+        try {
+            session.setState(newState, Scope.CLUSTER);
+        } catch (IOException e) {
+            throw new ProcessException(e);
+        }
+
+        FlowFile flowFile = session.create();
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+}
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/MultiKeyStateNotDroppable.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/MultiKeyStateNotDroppable.java
new file mode 100644
index 0000000000..3acd253066
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/MultiKeyStateNotDroppable.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+@PrimaryNodeOnly
+@DefaultSchedule(period = "10 mins")
+@Stateful(scopes = Scope.CLUSTER, description = "Stores three counters in 
state", dropStateKeySupported = false)
+public class MultiKeyStateNotDroppable extends AbstractProcessor {
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .autoTerminateDefault(true)
+            .build();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return Set.of(REL_SUCCESS);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final Map<String, String> currentState;
+        try {
+            currentState = session.getState(Scope.CLUSTER).toMap();
+        } catch (IOException e) {
+            throw new ProcessException(e);
+        }
+
+        final int a = Integer.parseInt(currentState.getOrDefault("a", "0")) + 
1;
+        final int b = Integer.parseInt(currentState.getOrDefault("b", "0")) + 
1;
+        final int c = Integer.parseInt(currentState.getOrDefault("c", "0")) + 
1;
+
+        final Map<String, String> newState = new HashMap<>();
+        newState.put("a", String.valueOf(a));
+        newState.put("b", String.valueOf(b));
+        newState.put("c", String.valueOf(c));
+        try {
+            session.setState(newState, Scope.CLUSTER);
+        } catch (IOException e) {
+            throw new ProcessException(e);
+        }
+
+        FlowFile flowFile = session.create();
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+}
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 9df0ad1a4d..9b6ba1cbd5 100644
--- 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -33,6 +33,8 @@ org.apache.nifi.processors.tests.system.HoldInput
 org.apache.nifi.processors.tests.system.IngestFile
 org.apache.nifi.processors.tests.system.LoopFlowFile
 org.apache.nifi.processors.tests.system.MigrateProperties
+org.apache.nifi.processors.tests.system.MultiKeyState
+org.apache.nifi.processors.tests.system.MultiKeyStateNotDroppable
 org.apache.nifi.processors.tests.system.PartitionText
 org.apache.nifi.processors.tests.system.PassThrough
 org.apache.nifi.processors.tests.system.PassThroughRequiresInstanceClassLoading
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/state/AbstractStateKeyDropIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/state/AbstractStateKeyDropIT.java
new file mode 100644
index 0000000000..78e585202e
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/state/AbstractStateKeyDropIT.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.tests.system.state;
+
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
+import org.apache.nifi.web.api.dto.StateEntryDTO;
+import org.apache.nifi.web.api.dto.StateMapDTO;
+import org.apache.nifi.web.api.entity.ComponentStateEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractStateKeyDropIT extends NiFiSystemIT {
+
+    /**
+     * Retrieves the state for a given processor.
+     *
+     * @param processorId the ID of the processor
+     * @param scope       the scope of the state to retrieve (LOCAL or CLUSTER)
+     * @return a map containing the cluster state key-value pairs
+     * @throws IOException         IO exception
+     * @throws NiFiClientException NiFi Client Exception
+     */
+    protected Map<String, String> getProcessorState(final String processorId, 
final Scope scope) throws NiFiClientException, IOException {
+        final ComponentStateDTO componentState = 
getNifiClient().getProcessorClient().getProcessorState(processorId).getComponentState();
+        final Map<String, String> state = new HashMap<>();
+
+        switch (scope) {
+            case LOCAL:
+                if (componentState != null && componentState.getLocalState() 
!= null && componentState.getLocalState().getState() != null) {
+                    componentState.getLocalState().getState().forEach(entry -> 
state.put(entry.getKey(), entry.getValue()));
+                }
+                break;
+            case CLUSTER:
+                if (componentState != null && componentState.getClusterState() 
!= null && componentState.getClusterState().getState() != null) {
+                    componentState.getClusterState().getState().forEach(entry 
-> state.put(entry.getKey(), entry.getValue()));
+                }
+                break;
+        }
+
+        return state;
+    }
+
+    /**
+     * Drops the state for a given processor, optionally setting a new state.
+     *
+     * @param processorId the ID of the processor
+     * @param newState    a map containing the new state key-value pairs, or 
null to
+     *                    drop the state without setting a new one
+     * @return the response from the NiFi API
+     * @throws IOException         IO exception
+     * @throws NiFiClientException NiFi Client Exception
+     */
+    protected ComponentStateEntity dropProcessorState(final String 
processorId, final Map<String, String> newState) throws NiFiClientException, 
IOException {
+        final ComponentStateEntity entity = new ComponentStateEntity();
+
+        if (newState != null) {
+            final ComponentStateDTO stateDto = new ComponentStateDTO();
+            final StateMapDTO stateMapDTO = new StateMapDTO();
+            final List<StateEntryDTO> entries = new ArrayList<>();
+            newState.forEach((k, v) -> {
+                final StateEntryDTO entry = new StateEntryDTO();
+                entry.setKey(k);
+                entry.setValue(v);
+                entries.add(entry);
+            });
+            stateMapDTO.setState(entries);
+            stateDto.setClusterState(stateMapDTO);
+
+            entity.setComponentState(stateDto);
+        }
+
+        return 
getNifiClient().getProcessorClient().clearProcessorState(processorId, entity);
+    }
+
+    /**
+     * Runs a processor once and waits for it to stop.
+     *
+     * @param processor the processor entity to run
+     * @throws NiFiClientException  if there is an error with the NiFi client
+     * @throws IOException          if there is an IO error
+     * @throws InterruptedException if the thread is interrupted while waiting
+     */
+    protected void runProcessorOnce(final ProcessorEntity processor) throws 
NiFiClientException, IOException, InterruptedException {
+        getNifiClient().getProcessorClient().runProcessorOnce(processor);
+        getClientUtil().waitForStoppedProcessor(processor.getId());
+    }
+}
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/state/ClusterStateKeyDropIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/state/ClusterStateKeyDropIT.java
new file mode 100644
index 0000000000..76f09e0346
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/state/ClusterStateKeyDropIT.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.tests.system.state;
+
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.tests.system.NiFiInstanceFactory;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.entity.ComponentStateEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClusterStateKeyDropIT extends AbstractStateKeyDropIT {
+
+    @Override
+    public NiFiInstanceFactory getInstanceFactory() {
+        return createTwoNodeInstanceFactory();
+    }
+
+    @Test
+    public void testCannotDropStateKeyWithLocalAndClusterState() throws 
NiFiClientException, IOException, InterruptedException {
+        final ProcessorEntity generate = 
getClientUtil().createProcessor("GenerateFlowFile");
+        getClientUtil().updateProcessorProperties(generate, 
Collections.singletonMap("State Scope", "LOCAL"));
+        final ProcessorEntity terminate = 
getClientUtil().createProcessor("TerminateFlowFile");
+        getClientUtil().createConnection(generate, terminate, "success");
+
+        runProcessorOnce(generate);
+
+        getClientUtil().updateProcessorProperties(generate, 
Collections.singletonMap("State Scope", "CLUSTER"));
+        runProcessorOnce(generate);
+
+        // GenerateFlowFile has both local and cluster state, so dropping 
state should
+        // fail
+        assertThrows(NiFiClientException.class, () -> {
+            dropProcessorState(generate.getId(), Collections.emptyMap());
+        });
+    }
+
+    @Test
+    public void testCannotDropStateKeyIfFlagNotTrue() throws 
NiFiClientException, IOException, InterruptedException {
+        final ProcessorEntity processor = 
getClientUtil().createProcessor("MultiKeyStateNotDroppable");
+        final String processorId = processor.getId();
+
+        
assertFalse(getNifiClient().getProcessorClient().getProcessorState(processorId).getComponentState().isDropStateKeySupported());
+
+        runProcessorOnce(processor);
+
+        final Map<String, String> currentState = 
getProcessorState(processorId, Scope.CLUSTER);
+        assertEquals(Map.of("a", "1", "b", "1", "c", "1"), currentState);
+
+        // trying to remove key a
+        final Map<String, String> newState = Map.of("b", "1", "c", "1");
+
+        // MultiKeyStateNotDroppable processor has state but has 
dropStateKeySupported =
+        // false so it should also fail
+        assertThrows(NiFiClientException.class, () -> {
+            dropProcessorState(processorId, newState);
+        });
+    }
+
+    @Test
+    public void testCannotDropStateKeyWithMismatchedState() throws 
NiFiClientException, IOException, InterruptedException {
+        final ProcessorEntity processor = 
getClientUtil().createProcessor("MultiKeyState");
+        final String processorId = processor.getId();
+
+        
assertTrue(getNifiClient().getProcessorClient().getProcessorState(processorId).getComponentState().isDropStateKeySupported());
+
+        runProcessorOnce(processor);
+
+        final Map<String, String> currentState = 
getProcessorState(processorId, Scope.CLUSTER);
+        assertEquals(Map.of("a", "1", "b", "1", "c", "1"), currentState);
+
+        // trying to remove key "a" but with wrong value for "b"
+        assertThrows(NiFiClientException.class, () -> {
+            dropProcessorState(processorId, Map.of("b", "2", "c", "1"));
+        });
+    }
+
+    @Test
+    public void testCannotDropMultipleStateKeys() throws NiFiClientException, 
IOException, InterruptedException {
+        final ProcessorEntity processor = 
getClientUtil().createProcessor("MultiKeyState");
+        final String processorId = processor.getId();
+        runProcessorOnce(processor);
+
+        final Map<String, String> currentState = 
getProcessorState(processorId, Scope.CLUSTER);
+        assertEquals(Map.of("a", "1", "b", "1", "c", "1"), currentState);
+
+        // trying to remove two keys
+        assertThrows(NiFiClientException.class, () -> {
+            dropProcessorState(processorId, Map.of("c", "1"));
+        });
+    }
+
+    @Test
+    public void testCanDropSpecificStateKey() throws NiFiClientException, 
IOException, InterruptedException {
+        final ProcessorEntity processor = 
getClientUtil().createProcessor("MultiKeyState");
+        final String processorId = processor.getId();
+        runProcessorOnce(processor);
+
+        final Map<String, String> currentState = 
getProcessorState(processorId, Scope.CLUSTER);
+        assertEquals(Map.of("a", "1", "b", "1", "c", "1"), currentState);
+
+        // trying to remove key a
+        final Map<String, String> newState = Map.of("b", "1", "c", "1");
+        final ComponentStateEntity response = dropProcessorState(processorId, 
newState);
+
+        final Map<String, String> updatedState = new HashMap<>();
+        
response.getComponentState().getClusterState().getState().forEach(entry -> 
updatedState.put(entry.getKey(), entry.getValue()));
+        assertEquals(newState, updatedState);
+
+        final Map<String, String> state = getProcessorState(processorId, 
Scope.CLUSTER);
+        assertEquals(newState, state);
+    }
+
+    @Test
+    public void testClearAllStateWithNullPayload() throws NiFiClientException, 
IOException, InterruptedException {
+        final ProcessorEntity processor = 
getClientUtil().createProcessor("MultiKeyState");
+        final String processorId = processor.getId();
+        runProcessorOnce(processor);
+
+        final Map<String, String> currentState = 
getProcessorState(processorId, Scope.CLUSTER);
+        assertEquals(Map.of("a", "1", "b", "1", "c", "1"), currentState);
+
+        final ComponentStateEntity response = dropProcessorState(processorId, 
null);
+        
assertTrue(response.getComponentState().getClusterState().getState().isEmpty());
+
+        final Map<String, String> state = getProcessorState(processorId, 
Scope.CLUSTER);
+        assertTrue(state.isEmpty());
+    }
+
+    @Test
+    public void testClearAllStateWithEmptyPayload() throws 
NiFiClientException, IOException, InterruptedException {
+        final ProcessorEntity processor = 
getClientUtil().createProcessor("MultiKeyState");
+        final String processorId = processor.getId();
+        runProcessorOnce(processor);
+
+        final Map<String, String> currentState = 
getProcessorState(processorId, Scope.CLUSTER);
+        assertEquals(Map.of("a", "1", "b", "1", "c", "1"), currentState);
+
+        final ComponentStateEntity response = dropProcessorState(processorId, 
Map.of());
+        
assertTrue(response.getComponentState().getClusterState().getState().isEmpty());
+
+        final Map<String, String> state = getProcessorState(processorId, 
Scope.CLUSTER);
+        assertTrue(state.isEmpty());
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/state/StandaloneStateKeyDropIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/state/StandaloneStateKeyDropIT.java
new file mode 100644
index 0000000000..04d03d8da8
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/state/StandaloneStateKeyDropIT.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.tests.system.state;
+
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.entity.ComponentStateEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class StandaloneStateKeyDropIT extends AbstractStateKeyDropIT {
+
+    @Test
+    public void testDeleteStateKeyOnStandalone() throws NiFiClientException, 
IOException, InterruptedException {
+        final ProcessorEntity generate = 
getClientUtil().createProcessor("GenerateFlowFile");
+        getClientUtil().updateProcessorProperties(generate, 
Collections.singletonMap("State Scope", "CLUSTER"));
+        final ProcessorEntity terminate = 
getClientUtil().createProcessor("TerminateFlowFile");
+        getClientUtil().createConnection(generate, terminate, "success");
+
+        runProcessorOnce(generate);
+
+        // even if the processor is configured to use CLUSTER scope, it will 
still have
+        // a local state because this is a standalone instance
+        final Map<String, String> state = getProcessorState(generate.getId(), 
Scope.CLUSTER);
+        assertNull(state.get("count"));
+
+        final Map<String, String> localState = 
getProcessorState(generate.getId(), Scope.LOCAL);
+        assertEquals("1", localState.get("count"));
+
+        // drop specific keys with LOCAL state
+        ComponentStateEntity newState = dropProcessorState(generate.getId(), 
Collections.emptyMap());
+        assertNull(newState.getComponentState().getClusterState());
+        
assertTrue(newState.getComponentState().getLocalState().getState().isEmpty());
+
+        final Map<String, String> after = getProcessorState(generate.getId(), 
Scope.LOCAL);
+        assertTrue(after.isEmpty());
+
+        runProcessorOnce(generate);
+
+        // can drop full state
+        dropProcessorState(generate.getId(), null);
+        assertTrue(getProcessorState(generate.getId(), Scope.LOCAL).isEmpty());
+    }
+
+    @Test
+    public void testCannotDropStateKeyIfFlagNotTrue() throws 
NiFiClientException, IOException, InterruptedException {
+        final ProcessorEntity multi = 
getClientUtil().createProcessor("MultiKeyStateNotDroppable");
+
+        
assertFalse(getNifiClient().getProcessorClient().getProcessorState(multi.getId()).getComponentState().isDropStateKeySupported());
+
+        runProcessorOnce(multi);
+
+        final Map<String, String> currentState = 
getProcessorState(multi.getId(), Scope.LOCAL);
+        assertEquals(Map.of("a", "1", "b", "1", "c", "1"), currentState);
+
+        // trying to remove key a
+        final Map<String, String> newState = Map.of("b", "1", "c", "1");
+
+        // MultiKeyStateNotDroppable processor has state but has 
dropStateKeySupported =
+        // false so it should also fail
+        assertThrows(NiFiClientException.class, () -> {
+            dropProcessorState(multi.getId(), newState);
+        });
+    }
+
+    @Test
+    public void testCannotDropStateKeyWithMismatchedState() throws 
NiFiClientException, IOException, InterruptedException {
+        final ProcessorEntity multi = 
getClientUtil().createProcessor("MultiKeyState");
+        runProcessorOnce(multi);
+
+        final Map<String, String> currentState = 
getProcessorState(multi.getId(), Scope.LOCAL);
+        assertEquals(Map.of("a", "1", "b", "1", "c", "1"), currentState);
+
+        // trying to remove key "a" but with wrong value for "b"
+        assertThrows(NiFiClientException.class, () -> {
+            dropProcessorState(multi.getId(), Map.of("b", "2", "c", "1"));
+        });
+    }
+
+    @Test
+    public void testCannotDropMultipleStateKeys() throws NiFiClientException, 
IOException, InterruptedException {
+        final ProcessorEntity multi = 
getClientUtil().createProcessor("MultiKeyState");
+        runProcessorOnce(multi);
+
+        final Map<String, String> currentState = 
getProcessorState(multi.getId(), Scope.LOCAL);
+        assertEquals(Map.of("a", "1", "b", "1", "c", "1"), currentState);
+
+        // trying to remove two keys
+        assertThrows(NiFiClientException.class, () -> {
+            dropProcessorState(multi.getId(), Map.of("c", "1"));
+        });
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProcessorClient.java
 
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProcessorClient.java
index 173b79128b..247065b313 100644
--- 
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProcessorClient.java
+++ 
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProcessorClient.java
@@ -60,7 +60,13 @@ public interface ProcessorClient {
 
     ProcessorEntity terminateProcessor(String processorId) throws 
NiFiClientException, IOException;
 
-    ComponentStateEntity clearProcessorState(String processorId) throws 
NiFiClientException, IOException;
+    default ComponentStateEntity clearProcessorState(String processorId) 
throws NiFiClientException, IOException {
+        return clearProcessorState(processorId, null);
+    }
+
+    ComponentStateEntity clearProcessorState(String processorId, 
ComponentStateEntity componentStateEntity) throws NiFiClientException, 
IOException;
+
+    ComponentStateEntity getProcessorState(String processorId) throws 
NiFiClientException, IOException;
 
     /**
      * Indicates that mutable requests should indicate that the client has
diff --git 
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProcessorClient.java
 
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProcessorClient.java
index c930b7c992..2b09be0314 100644
--- 
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProcessorClient.java
+++ 
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProcessorClient.java
@@ -287,7 +287,7 @@ public class JerseyProcessorClient extends 
AbstractJerseyClient implements Proce
     }
 
     @Override
-    public ComponentStateEntity clearProcessorState(String processorId) throws 
NiFiClientException, IOException {
+    public ComponentStateEntity clearProcessorState(final String processorId, 
final ComponentStateEntity componentStateEntity) throws NiFiClientException, 
IOException {
         Objects.requireNonNull(processorId, "Processor ID required");
 
         return executeAction("Error clearing state of the Processor", () -> {
@@ -295,7 +295,20 @@ public class JerseyProcessorClient extends 
AbstractJerseyClient implements Proce
                 .path("/state/clear-requests")
                 .resolveTemplate("id", processorId);
 
-            return getRequestBuilder(target).post(null, 
ComponentStateEntity.class);
+            return 
getRequestBuilder(target).post(Entity.entity(componentStateEntity, 
MediaType.APPLICATION_JSON_TYPE), ComponentStateEntity.class);
+        });
+    }
+
+    @Override
+    public ComponentStateEntity getProcessorState(String processorId) throws 
NiFiClientException, IOException {
+        Objects.requireNonNull(processorId, "Processor ID required");
+
+        return executeAction("Error getting state of the Processor", () -> {
+            final WebTarget target = processorTarget
+                    .path("/state")
+                    .resolveTemplate("id", processorId);
+
+            return getRequestBuilder(target).get(ComponentStateEntity.class);
         });
     }
 }


Reply via email to