markap14 commented on code in PR #10986:
URL: https://github.com/apache/nifi/pull/10986#discussion_r3202616581


##########
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java:
##########
@@ -323,6 +347,160 @@ public Response exportProcessGroup(
         return 
generateOkResponse(currentVersionedFlowSnapshot).header(HttpHeaders.CONTENT_DISPOSITION,
 String.format("attachment; filename=\"%s\"", filename)).build();
     }
 
+    /**
+     * Replicates the flow export request to all cluster nodes and merges the 
LOCAL state from each node
+     * into a single response. Each node returns a RegisteredFlowSnapshot 
containing only its own LOCAL
+     * state (keyed by its ordinal). This method collects all node responses 
and combines the localNodeStates
+     * maps so the exported flow contains LOCAL state from every node.
+     *
+     * @return the merged response containing LOCAL state from all nodes
+     */
+    private Response replicateAndMergeFlowExport() {

Review Comment:
   Perhaps I'm missing something, but I think we should register an 
`EndpointResponseMerger` for `GET /process-groups/<uuid>/download`. This is the 
typical pattern that we follow to handle merging the responses. A merger would 
also let the framework drain unread response streams consistently.
   



##########
nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/ClusterFlowDefinitionExportImportStateIT.java:
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.tests.system.pg;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.flow.VersionedComponentState;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
+import org.apache.nifi.tests.system.NiFiInstanceFactory;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
+import org.apache.nifi.web.api.dto.StateEntryDTO;
+import org.apache.nifi.web.api.entity.ComponentStateEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupImportEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupReplaceRequestEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClusterFlowDefinitionExportImportStateIT extends NiFiSystemIT {
+
+    private static final ObjectMapper MAPPER = new ObjectMapper()
+            .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+
+    @Override
+    public NiFiInstanceFactory getInstanceFactory() {
+        return createTwoNodeInstanceFactory();
+    }
+
+    @Test
+    public void testClusterExportCapturesClusterState() throws 
NiFiClientException, IOException, InterruptedException {
+        final ProcessGroupEntity pg = 
getClientUtil().createProcessGroup("TestGroup", "root");
+        final ProcessorEntity stateful = 
getClientUtil().createProcessor("GenerateFlowFile", pg.getId());
+        getClientUtil().updateProcessorProperties(stateful, 
Collections.singletonMap("State Scope", "CLUSTER"));
+        final ProcessorEntity terminate = 
getClientUtil().createProcessor("TerminateFlowFile", pg.getId());
+        getClientUtil().createConnection(stateful, terminate, "success");
+
+        getClientUtil().startProcessor(stateful);
+        waitForStatePopulated(stateful.getId(), Scope.CLUSTER);
+        getClientUtil().stopProcessor(stateful);
+        getClientUtil().waitForStoppedProcessor(stateful.getId());
+
+        final File exportFile = new File("target/st13-export.json");
+        getNifiClient().getProcessGroupClient().exportProcessGroup(pg.getId(), 
true, true, exportFile);
+
+        final RegisteredFlowSnapshot snapshot = MAPPER.readValue(exportFile, 
RegisteredFlowSnapshot.class);
+        final VersionedProcessor proc = 
findProcessorByType(snapshot.getFlowContents(), "GenerateFlowFile");
+        assertNotNull(proc);
+        assertNotNull(proc.getComponentState());
+        assertNotNull(proc.getComponentState().getClusterState());
+        assertNotNull(proc.getComponentState().getClusterState().get("count"));
+    }
+
+    @Test
+    public void testClusterExportCapturesLocalStateFromBothNodes() throws 
NiFiClientException, IOException, InterruptedException {
+        final ProcessGroupEntity pg = 
getClientUtil().createProcessGroup("TestGroup", "root");
+        final ProcessorEntity stateful = 
getClientUtil().createProcessor("GenerateFlowFile", pg.getId());
+        final ProcessorEntity terminate = 
getClientUtil().createProcessor("TerminateFlowFile", pg.getId());
+        getClientUtil().createConnection(stateful, terminate, "success");
+
+        getClientUtil().startProcessor(stateful);
+        waitForStatePopulated(stateful.getId(), Scope.LOCAL);
+        getClientUtil().stopProcessor(stateful);
+        getClientUtil().waitForStoppedProcessor(stateful.getId());
+
+        final File exportFile = new File("target/st14-export.json");
+        getNifiClient().getProcessGroupClient().exportProcessGroup(pg.getId(), 
true, true, exportFile);
+
+        final RegisteredFlowSnapshot snapshot = MAPPER.readValue(exportFile, 
RegisteredFlowSnapshot.class);
+        final VersionedProcessor proc = 
findProcessorByType(snapshot.getFlowContents(), "GenerateFlowFile");
+        assertNotNull(proc);
+        assertNotNull(proc.getComponentState());
+        assertNotNull(proc.getComponentState().getLocalNodeStates());
+        assertEquals(2, proc.getComponentState().getLocalNodeStates().size(),
+                "Should have local state from both nodes");
+        assertNotNull(proc.getComponentState().getLocalNodeStates().get(0));
+        assertNotNull(proc.getComponentState().getLocalNodeStates().get(1));
+    }
+
+    @Test
+    public void testClusterExportCapturesBothScopes() throws 
NiFiClientException, IOException, InterruptedException {
+        final ProcessGroupEntity pg = 
getClientUtil().createProcessGroup("TestGroup", "root");
+        final ProcessorEntity stateful = 
getClientUtil().createProcessor("StatefulCountProcessor", pg.getId());
+        final ProcessorEntity terminate = 
getClientUtil().createProcessor("TerminateFlowFile", pg.getId());
+        getClientUtil().createConnection(stateful, terminate, "success");
+
+        getClientUtil().startProcessor(stateful);
+        waitForStatePopulated(stateful.getId(), Scope.CLUSTER);
+        waitForStatePopulated(stateful.getId(), Scope.LOCAL);
+        getClientUtil().stopProcessor(stateful);
+        getClientUtil().waitForStoppedProcessor(stateful.getId());
+
+        final File exportFile = new File("target/st15-export.json");
+        getNifiClient().getProcessGroupClient().exportProcessGroup(pg.getId(), 
true, true, exportFile);
+
+        final RegisteredFlowSnapshot snapshot = MAPPER.readValue(exportFile, 
RegisteredFlowSnapshot.class);
+        final VersionedProcessor proc = 
findProcessorByType(snapshot.getFlowContents(), "StatefulCountProcessor");
+        assertNotNull(proc);
+
+        final VersionedComponentState state = proc.getComponentState();
+        assertNotNull(state);
+        assertNotNull(state.getClusterState(), "Cluster state should be 
present");
+        assertNotNull(state.getClusterState().get("count"), "Cluster state 
should contain count");
+        assertNotNull(state.getLocalNodeStates(), "Local node states should be 
present");
+        assertEquals(2, state.getLocalNodeStates().size());
+        
assertNotNull(state.getLocalNodeStates().get(0).getState().get("count"));
+        
assertNotNull(state.getLocalNodeStates().get(1).getState().get("count"));
+    }
+
+    @Test
+    public void testClusterRoundTripSameTopologyBothScopes() throws 
NiFiClientException, IOException, InterruptedException {
+        final ProcessGroupEntity pg = 
getClientUtil().createProcessGroup("TestGroup", "root");
+        final ProcessorEntity stateful = 
getClientUtil().createProcessor("StatefulCountProcessor", pg.getId());
+        final ProcessorEntity terminate = 
getClientUtil().createProcessor("TerminateFlowFile", pg.getId());
+        getClientUtil().createConnection(stateful, terminate, "success");
+
+        getClientUtil().startProcessor(stateful);
+        waitForStatePopulated(stateful.getId(), Scope.CLUSTER);
+        waitForStatePopulated(stateful.getId(), Scope.LOCAL);
+        getClientUtil().stopProcessor(stateful);
+        getClientUtil().waitForStoppedProcessor(stateful.getId());
+
+        final Map<String, String> originalClusterState = 
getProcessorState(stateful.getId(), Scope.CLUSTER);
+        final Map<String, Map<String, String>> originalLocalStatesByNode = 
getProcessorLocalStatesByNode(stateful.getId());
+        assertEquals(2, originalLocalStatesByNode.size(),
+                "Local state should be populated on both cluster nodes prior 
to export");
+
+        final File exportFile = new File("target/st16-export.json");
+        getNifiClient().getProcessGroupClient().exportProcessGroup(pg.getId(), 
true, true, exportFile);
+
+        emptyQueuesAndDeleteProcessGroup(pg);
+
+        final ProcessGroupEntity uploaded = 
getNifiClient().getProcessGroupClient().upload("root", exportFile, 
"ImportedGroup", 0.0, 0.0);
+        final ProcessorEntity importedProcessor = 
findProcessorByTypeInGroup(uploaded.getId(), "StatefulCountProcessor");
+        assertNotNull(importedProcessor);
+
+        final Map<String, String> importedClusterState = 
getProcessorState(importedProcessor.getId(), Scope.CLUSTER);
+        assertEquals(originalClusterState.get("count"), 
importedClusterState.get("count"),
+                "Cluster state should be restored after round-trip");
+
+        final Map<String, Map<String, String>> importedLocalStatesByNode = 
getProcessorLocalStatesByNode(importedProcessor.getId());
+        assertEquals(originalLocalStatesByNode, importedLocalStatesByNode,
+                "Local state should be restored to the same node it was 
exported from after round-trip");
+    }
+
+    @Test
+    public void testClusterExportRunningProcessorReturns409() throws 
NiFiClientException, IOException, InterruptedException {
+        final ProcessGroupEntity pg = 
getClientUtil().createProcessGroup("TestGroup", "root");
+        final ProcessorEntity stateful = 
getClientUtil().createProcessor("StatefulCountProcessor", pg.getId());
+        final ProcessorEntity terminate = 
getClientUtil().createProcessor("TerminateFlowFile", pg.getId());
+        getClientUtil().createConnection(stateful, terminate, "success");
+
+        getClientUtil().startProcessor(stateful);
+        waitForStatePopulated(stateful.getId(), Scope.LOCAL);
+
+        try {
+            final File exportFile = new File("target/st20-export.json");
+            
getNifiClient().getProcessGroupClient().exportProcessGroup(pg.getId(), false, 
true, exportFile);
+            throw new AssertionError("Expected export to fail when processors 
are running");
+        } catch (final NiFiClientException e) {
+            assertNotNull(e.getMessage());
+        } finally {
+            getClientUtil().stopProcessor(stateful);
+            getClientUtil().waitForStoppedProcessor(stateful.getId());
+        }
+    }
+
+    @Test
+    public void testClusterReplaceRejectsFlowDefinitionWithComponentState() 
throws NiFiClientException, IOException, InterruptedException {
+        final ProcessGroupEntity pg = 
getClientUtil().createProcessGroup("TestGroup", "root");
+        final ProcessorEntity stateful = 
getClientUtil().createProcessor("StatefulCountProcessor", pg.getId());
+        final ProcessorEntity terminate = 
getClientUtil().createProcessor("TerminateFlowFile", pg.getId());
+        getClientUtil().createConnection(stateful, terminate, "success");
+
+        getClientUtil().startProcessor(stateful);
+        waitForStatePopulated(stateful.getId(), Scope.CLUSTER);
+        waitForStatePopulated(stateful.getId(), Scope.LOCAL);
+        getClientUtil().stopProcessor(stateful);
+        getClientUtil().waitForStoppedProcessor(stateful.getId());
+
+        final File exportFile = new File("target/st21-export.json");
+        getNifiClient().getProcessGroupClient().exportProcessGroup(pg.getId(), 
true, true, exportFile);
+
+        final RegisteredFlowSnapshot snapshot = MAPPER.readValue(exportFile, 
RegisteredFlowSnapshot.class);
+        final NiFiClientException exception = 
assertThrows(NiFiClientException.class, () -> replaceProcessGroup(pg, 
snapshot));
+        assertTrue(exception.getMessage().contains("component state"),
+                "Expected rejection message about component state but got: " + 
exception.getMessage());
+    }
+
+    @Test
+    public void testClusterImportWithoutComponentStateHasNoState() throws 
NiFiClientException, IOException, InterruptedException {
+        final ProcessGroupEntity pg = 
getClientUtil().createProcessGroup("TestGroup", "root");
+        final ProcessorEntity stateful = 
getClientUtil().createProcessor("StatefulCountProcessor", pg.getId());
+        final ProcessorEntity terminate = 
getClientUtil().createProcessor("TerminateFlowFile", pg.getId());
+        getClientUtil().createConnection(stateful, terminate, "success");
+
+        getClientUtil().startProcessor(stateful);
+        waitForStatePopulated(stateful.getId(), Scope.CLUSTER);
+        getClientUtil().stopProcessor(stateful);
+        getClientUtil().waitForStoppedProcessor(stateful.getId());
+
+        final File exportFile = new File("target/st22-export.json");
+        getNifiClient().getProcessGroupClient().exportProcessGroup(pg.getId(), 
true, false, exportFile);
+
+        emptyQueuesAndDeleteProcessGroup(pg);
+
+        final ProcessGroupEntity uploaded = 
getNifiClient().getProcessGroupClient().upload("root", exportFile, 
"ImportedGroup", 0.0, 0.0);
+        final ProcessorEntity importedProcessor = 
findProcessorByTypeInGroup(uploaded.getId(), "StatefulCountProcessor");
+        assertNotNull(importedProcessor);
+
+        final Map<String, String> importedClusterState = 
getProcessorState(importedProcessor.getId(), Scope.CLUSTER);
+        assertTrue(importedClusterState.isEmpty(), "State should be empty when 
imported without componentState");
+    }
+
+    private void waitForStatePopulated(final String processorId, final Scope 
scope) throws InterruptedException {
+        waitFor(() -> {
+            try {
+                final Map<String, String> state = 
getProcessorState(processorId, scope);
+                return state.get("count") != null;
+            } catch (final Exception e) {
+                return false;
+            }
+        });
+    }
+
+    private Map<String, Map<String, String>> 
getProcessorLocalStatesByNode(final String processorId) throws 
NiFiClientException, IOException {
+        final ComponentStateEntity stateEntity = 
getNifiClient().getProcessorClient().getProcessorState(processorId);
+        final ComponentStateDTO componentState = 
stateEntity.getComponentState();
+        final Map<String, Map<String, String>> byNode = new HashMap<>();
+        if (componentState != null && componentState.getLocalState() != null 
&& componentState.getLocalState().getState() != null) {
+            for (final StateEntryDTO entry : 
componentState.getLocalState().getState()) {
+                final String nodeId = entry.getClusterNodeId();
+                byNode.computeIfAbsent(nodeId, id -> new 
HashMap<>()).put(entry.getKey(), entry.getValue());
+            }
+        }
+        return byNode;
+    }
+
+    private Map<String, String> getProcessorState(final String processorId, 
final Scope scope) throws NiFiClientException, IOException {

Review Comment:
   `waitForStatePopulated` and `getProcessorState` — the wait polls a flattened 
map and returns as soon as either node has populated count. The tests then 
assert two local-state slices are present at lines 104 
(testClusterExportCapturesLocalStateFromBothNodes), 135 
(testClusterExportCapturesBothScopes), and 155 / 172 
(testClusterRoundTripSameTopologyBothScopes). If the second node has not yet 
executed onTrigger by the time the wait returns, those assertions will flake. 
The wait should explicitly verify both ordinals have state (e.g., 
getProcessorLocalStatesByNode(...).size() == 2) for the round-trip case.



##########
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java:
##########
@@ -5935,24 +5982,58 @@ private Set<String> getAllSubGroups(ProcessGroup 
processGroup) {
         return result;
     }
 
+    /**
+     * Computes the ordinal index of the local node among connected cluster 
nodes.
+     * Nodes are sorted deterministically by apiAddress. In standalone mode, 
returns 0.
+     *
+     * @return the ordinal index of the local node, or 0 if not clustered
+     */
+    private int computeLocalNodeOrdinal() {

Review Comment:
   This duplicates the logic added in FlowController.getLocalNodeOrdinal() in 
the same PR. The new ClusterTopologyProvider interface was introduced for 
exactly this purpose; the service facade should depend on it rather than 
reimplementing the ordinal calculation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to