bobpaulin commented on code in PR #10986:
URL: https://github.com/apache/nifi/pull/10986#discussion_r3120580898


##########
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java:
##########
@@ -4080,6 +4096,102 @@ private Map<String, String> getPropertyValues(final 
ComponentNode componentNode)
         return propertyValues;
     }
 
+    private void validateLocalStateTopology(final VersionedProcessGroup 
proposed) {
+        final int connectedNodeCount = context.getConnectedNodeCount();
+        if (connectedNodeCount <= 0) {
+            return;
+        }
+
+        final int maxSourceNodes = findMaxLocalStateNodeCount(proposed);
+        if (maxSourceNodes > connectedNodeCount) {
+            throw new IllegalStateException(
+                    "Cannot import flow with component state: the flow 
definition contains local state from %d source node(s) but the destination 
cluster has only %d connected node(s). "
+                            .formatted(maxSourceNodes, connectedNodeCount)
+                    + "Import into a cluster with at least %d node(s), or 
export without component state.".formatted(maxSourceNodes));
+        }
+    }
+
+    private int findMaxLocalStateNodeCount(final VersionedProcessGroup group) {
+        int max = 0;
+        for (final VersionedConfigurableExtension ext : 
getStatefulExtensions(group)) {
+            final VersionedComponentState state = ext.getComponentState();
+            if (state != null && state.getLocalNodeStates() != null) {
+                max = Math.max(max, state.getLocalNodeStates().size());
+            }
+        }
+        if (group.getProcessGroups() != null) {
+            for (final VersionedProcessGroup child : group.getProcessGroups()) 
{
+                max = Math.max(max, findMaxLocalStateNodeCount(child));
+            }
+        }
+        return max;
+    }
+
+    private List<VersionedConfigurableExtension> getStatefulExtensions(final 
VersionedProcessGroup group) {
+        final List<VersionedConfigurableExtension> extensions = new 
ArrayList<>();
+        if (group.getProcessors() != null) {
+            extensions.addAll(group.getProcessors());
+        }
+        if (group.getControllerServices() != null) {
+            extensions.addAll(group.getControllerServices());
+        }
+        return extensions;
+    }
+
+    private void restoreComponentState(final String componentId, final 
VersionedComponentState componentState, final ComponentNode componentNode) {
+        if (componentState == null) {
+            return;
+        }
+
+        final StateManagerProvider stateManagerProvider = 
context.getStateManagerProvider();
+        if (stateManagerProvider == null) {
+            LOG.debug("StateManagerProvider is not available; skipping state 
restoration for component {}", componentId);

Review Comment:
   This makes sense that not having the state manager provider will prevent us 
from being able to import state.  Not blocking the import make sense since the 
user may want to proceed with out it. However I would challenge if this should 
be logged only at debug.  Shouldn't the user be notified if they were expecting 
to import state but it was not?  These seems like it would very likely impact 
the behavior after the flow is started.  I'd consider this may be a WARN level 
event.



##########
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java:
##########
@@ -4080,6 +4096,102 @@ private Map<String, String> getPropertyValues(final 
ComponentNode componentNode)
         return propertyValues;
     }
 
+    private void validateLocalStateTopology(final VersionedProcessGroup 
proposed) {
+        final int connectedNodeCount = context.getConnectedNodeCount();
+        if (connectedNodeCount <= 0) {
+            return;
+        }
+
+        final int maxSourceNodes = findMaxLocalStateNodeCount(proposed);
+        if (maxSourceNodes > connectedNodeCount) {
+            throw new IllegalStateException(
+                    "Cannot import flow with component state: the flow 
definition contains local state from %d source node(s) but the destination 
cluster has only %d connected node(s). "
+                            .formatted(maxSourceNodes, connectedNodeCount)
+                    + "Import into a cluster with at least %d node(s), or 
export without component state.".formatted(maxSourceNodes));
+        }
+    }
+
+    private int findMaxLocalStateNodeCount(final VersionedProcessGroup group) {
+        int max = 0;
+        for (final VersionedConfigurableExtension ext : 
getStatefulExtensions(group)) {
+            final VersionedComponentState state = ext.getComponentState();
+            if (state != null && state.getLocalNodeStates() != null) {
+                max = Math.max(max, state.getLocalNodeStates().size());
+            }
+        }
+        if (group.getProcessGroups() != null) {
+            for (final VersionedProcessGroup child : group.getProcessGroups()) 
{
+                max = Math.max(max, findMaxLocalStateNodeCount(child));
+            }
+        }
+        return max;
+    }
+
+    private List<VersionedConfigurableExtension> getStatefulExtensions(final 
VersionedProcessGroup group) {
+        final List<VersionedConfigurableExtension> extensions = new 
ArrayList<>();
+        if (group.getProcessors() != null) {
+            extensions.addAll(group.getProcessors());
+        }
+        if (group.getControllerServices() != null) {
+            extensions.addAll(group.getControllerServices());
+        }
+        return extensions;
+    }
+
+    private void restoreComponentState(final String componentId, final 
VersionedComponentState componentState, final ComponentNode componentNode) {
+        if (componentState == null) {
+            return;
+        }
+
+        final StateManagerProvider stateManagerProvider = 
context.getStateManagerProvider();
+        if (stateManagerProvider == null) {
+            LOG.debug("StateManagerProvider is not available; skipping state 
restoration for component {}", componentId);
+            return;
+        }
+
+        final ConfigurableComponent component = componentNode.getComponent();
+        if (component == null) {
+            LOG.debug("Component {} is not available; skipping state 
restoration", componentId);
+            return;
+        }
+
+        final Stateful stateful = 
component.getClass().getAnnotation(Stateful.class);
+        if (stateful == null) {
+            LOG.debug("Component {} ({}) is not annotated with @Stateful; 
skipping state restoration", componentId, component.getClass().getSimpleName());
+            return;
+        }
+
+        final Set<Scope> supportedScopes = Set.of(stateful.scopes());
+        final StateManager stateManager = 
stateManagerProvider.getStateManager(componentId);
+
+        try {
+            if (supportedScopes.contains(Scope.CLUSTER) && 
componentState.getClusterState() != null && 
!componentState.getClusterState().isEmpty()) {
+                stateManager.setState(componentState.getClusterState(), 
Scope.CLUSTER);
+                LOG.debug("Restored cluster state for component {}", 
componentId);
+            }
+
+            if (supportedScopes.contains(Scope.LOCAL) && 
componentState.getLocalNodeStates() != null && 
!componentState.getLocalNodeStates().isEmpty()) {
+                final int localNodeOrdinal = context.getLocalNodeOrdinal();
+                if (localNodeOrdinal < 0) {
+                    LOG.debug("Local node ordinal is not set; skipping local 
state restoration for component {}", componentId);

Review Comment:
   WARN level event?



##########
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java:
##########
@@ -4080,6 +4096,102 @@ private Map<String, String> getPropertyValues(final 
ComponentNode componentNode)
         return propertyValues;
     }
 
+    private void validateLocalStateTopology(final VersionedProcessGroup 
proposed) {
+        final int connectedNodeCount = context.getConnectedNodeCount();
+        if (connectedNodeCount <= 0) {
+            return;
+        }
+
+        final int maxSourceNodes = findMaxLocalStateNodeCount(proposed);
+        if (maxSourceNodes > connectedNodeCount) {
+            throw new IllegalStateException(
+                    "Cannot import flow with component state: the flow 
definition contains local state from %d source node(s) but the destination 
cluster has only %d connected node(s). "
+                            .formatted(maxSourceNodes, connectedNodeCount)
+                    + "Import into a cluster with at least %d node(s), or 
export without component state.".formatted(maxSourceNodes));
+        }
+    }
+
+    private int findMaxLocalStateNodeCount(final VersionedProcessGroup group) {
+        int max = 0;
+        for (final VersionedConfigurableExtension ext : 
getStatefulExtensions(group)) {
+            final VersionedComponentState state = ext.getComponentState();
+            if (state != null && state.getLocalNodeStates() != null) {
+                max = Math.max(max, state.getLocalNodeStates().size());
+            }
+        }
+        if (group.getProcessGroups() != null) {
+            for (final VersionedProcessGroup child : group.getProcessGroups()) 
{
+                max = Math.max(max, findMaxLocalStateNodeCount(child));
+            }
+        }
+        return max;
+    }
+
+    private List<VersionedConfigurableExtension> getStatefulExtensions(final 
VersionedProcessGroup group) {
+        final List<VersionedConfigurableExtension> extensions = new 
ArrayList<>();
+        if (group.getProcessors() != null) {
+            extensions.addAll(group.getProcessors());
+        }
+        if (group.getControllerServices() != null) {
+            extensions.addAll(group.getControllerServices());
+        }
+        return extensions;
+    }
+
+    private void restoreComponentState(final String componentId, final 
VersionedComponentState componentState, final ComponentNode componentNode) {
+        if (componentState == null) {
+            return;
+        }
+
+        final StateManagerProvider stateManagerProvider = 
context.getStateManagerProvider();
+        if (stateManagerProvider == null) {
+            LOG.debug("StateManagerProvider is not available; skipping state 
restoration for component {}", componentId);
+            return;
+        }
+
+        final ConfigurableComponent component = componentNode.getComponent();
+        if (component == null) {
+            LOG.debug("Component {} is not available; skipping state 
restoration", componentId);

Review Comment:
   WARN level event?



##########
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java:
##########
@@ -4080,6 +4096,102 @@ private Map<String, String> getPropertyValues(final 
ComponentNode componentNode)
         return propertyValues;
     }
 
+    private void validateLocalStateTopology(final VersionedProcessGroup 
proposed) {
+        final int connectedNodeCount = context.getConnectedNodeCount();
+        if (connectedNodeCount <= 0) {
+            return;
+        }
+
+        final int maxSourceNodes = findMaxLocalStateNodeCount(proposed);
+        if (maxSourceNodes > connectedNodeCount) {
+            throw new IllegalStateException(
+                    "Cannot import flow with component state: the flow 
definition contains local state from %d source node(s) but the destination 
cluster has only %d connected node(s). "
+                            .formatted(maxSourceNodes, connectedNodeCount)
+                    + "Import into a cluster with at least %d node(s), or 
export without component state.".formatted(maxSourceNodes));
+        }
+    }
+
+    private int findMaxLocalStateNodeCount(final VersionedProcessGroup group) {
+        int max = 0;
+        for (final VersionedConfigurableExtension ext : 
getStatefulExtensions(group)) {
+            final VersionedComponentState state = ext.getComponentState();
+            if (state != null && state.getLocalNodeStates() != null) {
+                max = Math.max(max, state.getLocalNodeStates().size());
+            }
+        }
+        if (group.getProcessGroups() != null) {
+            for (final VersionedProcessGroup child : group.getProcessGroups()) 
{
+                max = Math.max(max, findMaxLocalStateNodeCount(child));
+            }
+        }
+        return max;
+    }
+
+    private List<VersionedConfigurableExtension> getStatefulExtensions(final 
VersionedProcessGroup group) {
+        final List<VersionedConfigurableExtension> extensions = new 
ArrayList<>();
+        if (group.getProcessors() != null) {
+            extensions.addAll(group.getProcessors());
+        }
+        if (group.getControllerServices() != null) {
+            extensions.addAll(group.getControllerServices());
+        }
+        return extensions;
+    }
+
+    private void restoreComponentState(final String componentId, final 
VersionedComponentState componentState, final ComponentNode componentNode) {
+        if (componentState == null) {
+            return;
+        }
+
+        final StateManagerProvider stateManagerProvider = 
context.getStateManagerProvider();
+        if (stateManagerProvider == null) {
+            LOG.debug("StateManagerProvider is not available; skipping state 
restoration for component {}", componentId);
+            return;
+        }
+
+        final ConfigurableComponent component = componentNode.getComponent();
+        if (component == null) {
+            LOG.debug("Component {} is not available; skipping state 
restoration", componentId);
+            return;
+        }
+
+        final Stateful stateful = 
component.getClass().getAnnotation(Stateful.class);
+        if (stateful == null) {
+            LOG.debug("Component {} ({}) is not annotated with @Stateful; 
skipping state restoration", componentId, component.getClass().getSimpleName());

Review Comment:
   WARN level event?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to