This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 9b7440e077 NIFI-14597 Adjusted Stateless Engine to avoid calling 
yielded Source Processors (#9972)
9b7440e077 is described below

commit 9b7440e077c435a738547a3e208803bd3feaa27d
Author: Mark Payne <[email protected]>
AuthorDate: Tue May 27 11:03:23 2025 -0400

    NIFI-14597 Adjusted Stateless Engine to avoid calling yielded Source 
Processors (#9972)
    
    Ensure that we do not keep calling source processors in the Stateless 
Engine if the the source processor has yielded. Added system test to validate 
functionality.
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../repository/StandardProcessSession.java         |   2 +-
 .../flow/StandardStatelessFlowCurrent.java         |  29 +++++-
 .../basics/HonorsSourceProcessorYieldIT.java       |  80 ++++++++++++++++
 .../nifi/processors/tests/system/HoldInput.java    | 103 +++++++++++++++++++++
 .../nifi/processors/tests/system/YieldSource.java  |  81 ++++++++++++++++
 .../services/org.apache.nifi.processor.Processor   |   2 +
 6 files changed, 295 insertions(+), 2 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 2856fad167..3a9632b2d3 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -1197,7 +1197,7 @@ public class StandardProcessSession implements 
ProcessSession, ProvenanceEventEn
 
     protected synchronized void rollback(final boolean penalize, final boolean 
rollbackCheckpoint) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("{} session rollback called, FlowFile records are {} {}",
+            LOG.debug("{} session rollback called, FlowFile records are {}",
                     this, loggableFlowfileInfo(), new Throwable("Stack Trace 
on rollback"));
         }
 
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
index 06d6dee4d3..3b0d998f19 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
@@ -148,14 +148,41 @@ public class StandardStatelessFlowCurrent implements 
StatelessFlowCurrent {
 
         // If we weren't able to pull in any FlowFiles from outside the 
Stateless Flow, go ahead and trigger the root connectables.
         if (!flowFileSupplied) {
+            boolean allYielded = true;
             for (final Connectable connectable : rootConnectables) {
                 if (!transactionThresholdsMet || 
connectable.hasIncomingConnection()) {
+                    if (isYielded(connectable)) {
+                        continue;
+                    }
+
+                    allYielded = false;
                     triggerRootConnectable(connectable);
                 }
             }
+
+            // If all source processors are yielded, sleep for 10 milliseconds 
before returning so that as this
+            // is called in a loop, we don't consume 100% CPU accomplishing 
nothing.
+            if (allYielded) {
+                try {
+                    Thread.sleep(10L);
+                } catch (final InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                }
+            }
         }
     }
 
+    private boolean isYielded(final Connectable connectable) {
+        final long yieldExpiration = connectable.getYieldExpiration();
+
+        // If yield expiration <= 0, don't bother making system call to get 
current time
+        if (yieldExpiration > 0) {
+            return yieldExpiration > System.currentTimeMillis();
+        }
+
+        return false;
+    }
+
     private boolean triggerFlowFileSupplier() {
         if (flowFileSupplier == null) {
             return false;
@@ -247,7 +274,7 @@ public class StandardStatelessFlowCurrent implements 
StatelessFlowCurrent {
                     return NextConnectable.NEXT_READY;
                 }
 
-                // Check if we've reached out threshold for how much data we 
are willing to bring into a single transaction. If so, we will not drop back to
+                // Check if we've reached our threshold for how much data we 
are willing to bring into a single transaction. If so, we will not drop back to
                 // triggering source components
                 final boolean thresholdMet = 
transactionThresholdMeter.isThresholdMet();
                 if (thresholdMet) {
diff --git 
a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/HonorsSourceProcessorYieldIT.java
 
b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/HonorsSourceProcessorYieldIT.java
new file mode 100644
index 0000000000..c0eb47120c
--- /dev/null
+++ 
b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/HonorsSourceProcessorYieldIT.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.basics;
+
+import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.stateless.StatelessSystemIT;
+import org.apache.nifi.stateless.VersionedFlowBuilder;
+import org.apache.nifi.stateless.config.StatelessConfigurationException;
+import org.apache.nifi.stateless.flow.DataflowTrigger;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.flow.TransactionThresholds;
+import org.apache.nifi.stateless.flow.TriggerResult;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class HonorsSourceProcessorYieldIT extends StatelessSystemIT {
+
+    @Test
+    public void testSourceProcessorYieldIsHonored() throws 
StatelessConfigurationException, IOException, InterruptedException {
+        final VersionedFlowBuilder builder = new VersionedFlowBuilder();
+        final VersionedProcessor source = 
builder.createSimpleProcessor("YieldSource");
+        source.setProperties(Collections.singletonMap("Yield After", "5"));
+
+        final VersionedProcessor holdInput = 
builder.createSimpleProcessor("HoldInput");
+        holdInput.setProperties(Collections.singletonMap("Hold Time", "2 
secs"));
+        builder.createConnection(source, holdInput, "success");
+
+        final VersionedPort outputPort = builder.createOutputPort("Out");
+        builder.createConnection(holdInput, outputPort, "success");
+
+        final StatelessDataflow dataflow = 
loadDataflow(builder.getFlowSnapshot(), Collections.emptyList(), 
Collections.emptySet(), TransactionThresholds.UNLIMITED);
+        final DataflowTrigger trigger = dataflow.trigger();
+
+        // Ignore input for 3 seconds. During this time, the source processor 
should produce 5 FlowFiles and then start yielding.
+        // As a result, we should get 5 FlowFiles quickly and then 1 per 
second. This should result in 6-10 FlowFiles total,
+        // allowing for some variability in timing.
+        Thread.sleep(Duration.ofSeconds(3));
+
+        final Optional<TriggerResult> optionalResult = trigger.getResult(10, 
TimeUnit.SECONDS);
+        assertTrue(optionalResult.isPresent());
+
+        final Map<String, List<FlowFile>> outputMap = 
optionalResult.get().getOutputFlowFiles();
+        assertEquals(1, outputMap.size());
+
+        final List<FlowFile> flowFiles = outputMap.get("Out");
+        assertNotNull(flowFiles);
+        final int numFlowFiles = flowFiles.size();
+        assertTrue(numFlowFiles >= 6);
+        assertTrue(numFlowFiles <= 10);
+    }
+
+}
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/HoldInput.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/HoldInput.java
new file mode 100644
index 0000000000..e7dedfb783
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/HoldInput.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+public class HoldInput extends AbstractSessionFactoryProcessor {
+
+    static final PropertyDescriptor IGNORE_TIME = new 
PropertyDescriptor.Builder()
+        .name("Hold Time")
+        .description("The amount of time to hold input FlowFiles without 
releasing them")
+        .required(true)
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .defaultValue("3 sec")
+        .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("All FlowFiles go to this Relationship")
+        .build();
+
+    private volatile long lastTransferTime;
+
+    private final Set<FlowFileSession> flowFileSessions = 
Collections.synchronizedSet(new HashSet<>());
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return List.of(IGNORE_TIME);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return Set.of(REL_SUCCESS);
+    }
+
+    @OnScheduled
+    public void reset() {
+        lastTransferTime = System.nanoTime();
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
+        final ProcessSession session = sessionFactory.createSession();
+        final List<FlowFile> flowFiles = session.get(10_000);
+        if (flowFiles.isEmpty()) {
+            return;
+        }
+
+        flowFileSessions.add(new FlowFileSession(session, flowFiles));
+
+        final long nextTransferTime = lastTransferTime + 
context.getProperty(IGNORE_TIME).asTimePeriod(TimeUnit.NANOSECONDS);
+        if (System.nanoTime() < nextTransferTime) {
+            getLogger().debug("Ignoring input because {} is not yet reached", 
context.getProperty(IGNORE_TIME).getValue());
+            return;
+        }
+
+        for (final FlowFileSession flowFileSession : flowFileSessions) {
+            final ProcessSession processSession = flowFileSession.session();
+            processSession.transfer(flowFileSession.flowFiles(), REL_SUCCESS);
+            processSession.commitAsync();
+        }
+
+        getLogger().info("After ignoring input for {}, successfully 
transferred {} FlowFiles",
+            context.getProperty(IGNORE_TIME).getValue(), flowFiles.size());
+        lastTransferTime = System.nanoTime();
+    }
+
+    private record FlowFileSession(ProcessSession session, List<FlowFile> 
flowFiles) {
+    }
+}
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/YieldSource.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/YieldSource.java
new file mode 100644
index 0000000000..ff4185191a
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/YieldSource.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+public class YieldSource extends AbstractProcessor {
+    static final PropertyDescriptor YIELD_AFTER = new 
PropertyDescriptor.Builder()
+        .name("Yield After")
+        .description("The number of times to run before yielding")
+        .required(true)
+        .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+        .defaultValue("1")
+        .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("All FlowFiles go to this Relationship")
+        .build();
+
+    private final AtomicLong flowFileCount = new AtomicLong(0L);
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return List.of(YIELD_AFTER);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return Set.of(REL_SUCCESS);
+    }
+
+    @OnScheduled
+    public void reset() {
+        flowFileCount.set(0L);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final long alreadyCreated = flowFileCount.getAndIncrement();
+        if (alreadyCreated >= context.getProperty(YIELD_AFTER).asLong()) {
+            context.yield();
+            getLogger().info("Yielding after {} FlowFiles", alreadyCreated);
+        } else {
+            getLogger().info("Created FlowFile {} without yielding", 
alreadyCreated);
+        }
+
+        FlowFile flowFile = session.create();
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+}
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 1c68a41f36..91c2062c38 100644
--- 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -28,6 +28,7 @@ 
org.apache.nifi.processors.tests.system.EvaluatePropertiesWithDifferentELScopes
 org.apache.nifi.processors.tests.system.FakeProcessor
 org.apache.nifi.processors.tests.system.FakeDynamicPropertiesProcessor
 org.apache.nifi.processors.tests.system.GenerateFlowFile
+org.apache.nifi.processors.tests.system.HoldInput
 org.apache.nifi.processors.tests.system.IngestFile
 org.apache.nifi.processors.tests.system.LoopFlowFile
 org.apache.nifi.processors.tests.system.MigrateProperties
@@ -56,3 +57,4 @@ org.apache.nifi.processors.tests.system.VerifyEvenThenOdd
 org.apache.nifi.processors.tests.system.WriteFlowFileCountToFile
 org.apache.nifi.processors.tests.system.WriteLifecycleEvents
 org.apache.nifi.processors.tests.system.WriteToFile
+org.apache.nifi.processors.tests.system.YieldSource

Reply via email to