bobpaulin commented on code in PR #10986:
URL: https://github.com/apache/nifi/pull/10986#discussion_r3120580898
##########
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java:
##########
@@ -4080,6 +4096,102 @@ private Map<String, String> getPropertyValues(final
ComponentNode componentNode)
return propertyValues;
}
+ private void validateLocalStateTopology(final VersionedProcessGroup
proposed) {
+ final int connectedNodeCount = context.getConnectedNodeCount();
+ if (connectedNodeCount <= 0) {
+ return;
+ }
+
+ final int maxSourceNodes = findMaxLocalStateNodeCount(proposed);
+ if (maxSourceNodes > connectedNodeCount) {
+ throw new IllegalStateException(
+ "Cannot import flow with component state: the flow
definition contains local state from %d source node(s) but the destination
cluster has only %d connected node(s). "
+ .formatted(maxSourceNodes, connectedNodeCount)
+ + "Import into a cluster with at least %d node(s), or
export without component state.".formatted(maxSourceNodes));
+ }
+ }
+
+ private int findMaxLocalStateNodeCount(final VersionedProcessGroup group) {
+ int max = 0;
+ for (final VersionedConfigurableExtension ext :
getStatefulExtensions(group)) {
+ final VersionedComponentState state = ext.getComponentState();
+ if (state != null && state.getLocalNodeStates() != null) {
+ max = Math.max(max, state.getLocalNodeStates().size());
+ }
+ }
+ if (group.getProcessGroups() != null) {
+ for (final VersionedProcessGroup child : group.getProcessGroups())
{
+ max = Math.max(max, findMaxLocalStateNodeCount(child));
+ }
+ }
+ return max;
+ }
+
+ private List<VersionedConfigurableExtension> getStatefulExtensions(final
VersionedProcessGroup group) {
+ final List<VersionedConfigurableExtension> extensions = new
ArrayList<>();
+ if (group.getProcessors() != null) {
+ extensions.addAll(group.getProcessors());
+ }
+ if (group.getControllerServices() != null) {
+ extensions.addAll(group.getControllerServices());
+ }
+ return extensions;
+ }
+
+ private void restoreComponentState(final String componentId, final
VersionedComponentState componentState, final ComponentNode componentNode) {
+ if (componentState == null) {
+ return;
+ }
+
+ final StateManagerProvider stateManagerProvider =
context.getStateManagerProvider();
+ if (stateManagerProvider == null) {
+ LOG.debug("StateManagerProvider is not available; skipping state
restoration for component {}", componentId);
Review Comment:
This makes sense that not having the state manager provider will prevent us
from being able to import state. Not blocking the import make sense since the
user may want to proceed with out it. However I would challenge if this should
be logged only at debug. Shouldn't the user be notified if they were expecting
to import state but it was not? These seems like it would very likely impact
the behavior after the flow is started. I'd consider this may be a WARN level
event.
##########
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java:
##########
@@ -4080,6 +4096,102 @@ private Map<String, String> getPropertyValues(final
ComponentNode componentNode)
return propertyValues;
}
+ private void validateLocalStateTopology(final VersionedProcessGroup
proposed) {
+ final int connectedNodeCount = context.getConnectedNodeCount();
+ if (connectedNodeCount <= 0) {
+ return;
+ }
+
+ final int maxSourceNodes = findMaxLocalStateNodeCount(proposed);
+ if (maxSourceNodes > connectedNodeCount) {
+ throw new IllegalStateException(
+ "Cannot import flow with component state: the flow
definition contains local state from %d source node(s) but the destination
cluster has only %d connected node(s). "
+ .formatted(maxSourceNodes, connectedNodeCount)
+ + "Import into a cluster with at least %d node(s), or
export without component state.".formatted(maxSourceNodes));
+ }
+ }
+
+ private int findMaxLocalStateNodeCount(final VersionedProcessGroup group) {
+ int max = 0;
+ for (final VersionedConfigurableExtension ext :
getStatefulExtensions(group)) {
+ final VersionedComponentState state = ext.getComponentState();
+ if (state != null && state.getLocalNodeStates() != null) {
+ max = Math.max(max, state.getLocalNodeStates().size());
+ }
+ }
+ if (group.getProcessGroups() != null) {
+ for (final VersionedProcessGroup child : group.getProcessGroups())
{
+ max = Math.max(max, findMaxLocalStateNodeCount(child));
+ }
+ }
+ return max;
+ }
+
+ private List<VersionedConfigurableExtension> getStatefulExtensions(final
VersionedProcessGroup group) {
+ final List<VersionedConfigurableExtension> extensions = new
ArrayList<>();
+ if (group.getProcessors() != null) {
+ extensions.addAll(group.getProcessors());
+ }
+ if (group.getControllerServices() != null) {
+ extensions.addAll(group.getControllerServices());
+ }
+ return extensions;
+ }
+
+ private void restoreComponentState(final String componentId, final
VersionedComponentState componentState, final ComponentNode componentNode) {
+ if (componentState == null) {
+ return;
+ }
+
+ final StateManagerProvider stateManagerProvider =
context.getStateManagerProvider();
+ if (stateManagerProvider == null) {
+ LOG.debug("StateManagerProvider is not available; skipping state
restoration for component {}", componentId);
+ return;
+ }
+
+ final ConfigurableComponent component = componentNode.getComponent();
+ if (component == null) {
+ LOG.debug("Component {} is not available; skipping state
restoration", componentId);
+ return;
+ }
+
+ final Stateful stateful =
component.getClass().getAnnotation(Stateful.class);
+ if (stateful == null) {
+ LOG.debug("Component {} ({}) is not annotated with @Stateful;
skipping state restoration", componentId, component.getClass().getSimpleName());
+ return;
+ }
+
+ final Set<Scope> supportedScopes = Set.of(stateful.scopes());
+ final StateManager stateManager =
stateManagerProvider.getStateManager(componentId);
+
+ try {
+ if (supportedScopes.contains(Scope.CLUSTER) &&
componentState.getClusterState() != null &&
!componentState.getClusterState().isEmpty()) {
+ stateManager.setState(componentState.getClusterState(),
Scope.CLUSTER);
+ LOG.debug("Restored cluster state for component {}",
componentId);
+ }
+
+ if (supportedScopes.contains(Scope.LOCAL) &&
componentState.getLocalNodeStates() != null &&
!componentState.getLocalNodeStates().isEmpty()) {
+ final int localNodeOrdinal = context.getLocalNodeOrdinal();
+ if (localNodeOrdinal < 0) {
+ LOG.debug("Local node ordinal is not set; skipping local
state restoration for component {}", componentId);
Review Comment:
WARN level event?
##########
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java:
##########
@@ -4080,6 +4096,102 @@ private Map<String, String> getPropertyValues(final
ComponentNode componentNode)
return propertyValues;
}
+ private void validateLocalStateTopology(final VersionedProcessGroup
proposed) {
+ final int connectedNodeCount = context.getConnectedNodeCount();
+ if (connectedNodeCount <= 0) {
+ return;
+ }
+
+ final int maxSourceNodes = findMaxLocalStateNodeCount(proposed);
+ if (maxSourceNodes > connectedNodeCount) {
+ throw new IllegalStateException(
+ "Cannot import flow with component state: the flow
definition contains local state from %d source node(s) but the destination
cluster has only %d connected node(s). "
+ .formatted(maxSourceNodes, connectedNodeCount)
+ + "Import into a cluster with at least %d node(s), or
export without component state.".formatted(maxSourceNodes));
+ }
+ }
+
+ private int findMaxLocalStateNodeCount(final VersionedProcessGroup group) {
+ int max = 0;
+ for (final VersionedConfigurableExtension ext :
getStatefulExtensions(group)) {
+ final VersionedComponentState state = ext.getComponentState();
+ if (state != null && state.getLocalNodeStates() != null) {
+ max = Math.max(max, state.getLocalNodeStates().size());
+ }
+ }
+ if (group.getProcessGroups() != null) {
+ for (final VersionedProcessGroup child : group.getProcessGroups())
{
+ max = Math.max(max, findMaxLocalStateNodeCount(child));
+ }
+ }
+ return max;
+ }
+
+ private List<VersionedConfigurableExtension> getStatefulExtensions(final
VersionedProcessGroup group) {
+ final List<VersionedConfigurableExtension> extensions = new
ArrayList<>();
+ if (group.getProcessors() != null) {
+ extensions.addAll(group.getProcessors());
+ }
+ if (group.getControllerServices() != null) {
+ extensions.addAll(group.getControllerServices());
+ }
+ return extensions;
+ }
+
+ private void restoreComponentState(final String componentId, final
VersionedComponentState componentState, final ComponentNode componentNode) {
+ if (componentState == null) {
+ return;
+ }
+
+ final StateManagerProvider stateManagerProvider =
context.getStateManagerProvider();
+ if (stateManagerProvider == null) {
+ LOG.debug("StateManagerProvider is not available; skipping state
restoration for component {}", componentId);
+ return;
+ }
+
+ final ConfigurableComponent component = componentNode.getComponent();
+ if (component == null) {
+ LOG.debug("Component {} is not available; skipping state
restoration", componentId);
Review Comment:
WARN level event?
##########
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java:
##########
@@ -4080,6 +4096,102 @@ private Map<String, String> getPropertyValues(final
ComponentNode componentNode)
return propertyValues;
}
+ private void validateLocalStateTopology(final VersionedProcessGroup
proposed) {
+ final int connectedNodeCount = context.getConnectedNodeCount();
+ if (connectedNodeCount <= 0) {
+ return;
+ }
+
+ final int maxSourceNodes = findMaxLocalStateNodeCount(proposed);
+ if (maxSourceNodes > connectedNodeCount) {
+ throw new IllegalStateException(
+ "Cannot import flow with component state: the flow
definition contains local state from %d source node(s) but the destination
cluster has only %d connected node(s). "
+ .formatted(maxSourceNodes, connectedNodeCount)
+ + "Import into a cluster with at least %d node(s), or
export without component state.".formatted(maxSourceNodes));
+ }
+ }
+
+ private int findMaxLocalStateNodeCount(final VersionedProcessGroup group) {
+ int max = 0;
+ for (final VersionedConfigurableExtension ext :
getStatefulExtensions(group)) {
+ final VersionedComponentState state = ext.getComponentState();
+ if (state != null && state.getLocalNodeStates() != null) {
+ max = Math.max(max, state.getLocalNodeStates().size());
+ }
+ }
+ if (group.getProcessGroups() != null) {
+ for (final VersionedProcessGroup child : group.getProcessGroups())
{
+ max = Math.max(max, findMaxLocalStateNodeCount(child));
+ }
+ }
+ return max;
+ }
+
+ private List<VersionedConfigurableExtension> getStatefulExtensions(final
VersionedProcessGroup group) {
+ final List<VersionedConfigurableExtension> extensions = new
ArrayList<>();
+ if (group.getProcessors() != null) {
+ extensions.addAll(group.getProcessors());
+ }
+ if (group.getControllerServices() != null) {
+ extensions.addAll(group.getControllerServices());
+ }
+ return extensions;
+ }
+
+ private void restoreComponentState(final String componentId, final
VersionedComponentState componentState, final ComponentNode componentNode) {
+ if (componentState == null) {
+ return;
+ }
+
+ final StateManagerProvider stateManagerProvider =
context.getStateManagerProvider();
+ if (stateManagerProvider == null) {
+ LOG.debug("StateManagerProvider is not available; skipping state
restoration for component {}", componentId);
+ return;
+ }
+
+ final ConfigurableComponent component = componentNode.getComponent();
+ if (component == null) {
+ LOG.debug("Component {} is not available; skipping state
restoration", componentId);
+ return;
+ }
+
+ final Stateful stateful =
component.getClass().getAnnotation(Stateful.class);
+ if (stateful == null) {
+ LOG.debug("Component {} ({}) is not annotated with @Stateful;
skipping state restoration", componentId, component.getClass().getSimpleName());
Review Comment:
WARN level event?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]