This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 9b7440e077 NIFI-14597 Adjusted Stateless Engine to avoid calling
yielded Source Processors (#9972)
9b7440e077 is described below
commit 9b7440e077c435a738547a3e208803bd3feaa27d
Author: Mark Payne <[email protected]>
AuthorDate: Tue May 27 11:03:23 2025 -0400
NIFI-14597 Adjusted Stateless Engine to avoid calling yielded Source
Processors (#9972)
Ensure that we do not keep calling source processors in the Stateless
Engine if the the source processor has yielded. Added system test to validate
functionality.
Signed-off-by: David Handermann <[email protected]>
---
.../repository/StandardProcessSession.java | 2 +-
.../flow/StandardStatelessFlowCurrent.java | 29 +++++-
.../basics/HonorsSourceProcessorYieldIT.java | 80 ++++++++++++++++
.../nifi/processors/tests/system/HoldInput.java | 103 +++++++++++++++++++++
.../nifi/processors/tests/system/YieldSource.java | 81 ++++++++++++++++
.../services/org.apache.nifi.processor.Processor | 2 +
6 files changed, 295 insertions(+), 2 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 2856fad167..3a9632b2d3 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -1197,7 +1197,7 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
protected synchronized void rollback(final boolean penalize, final boolean
rollbackCheckpoint) {
if (LOG.isDebugEnabled()) {
- LOG.debug("{} session rollback called, FlowFile records are {} {}",
+ LOG.debug("{} session rollback called, FlowFile records are {}",
this, loggableFlowfileInfo(), new Throwable("Stack Trace
on rollback"));
}
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
index 06d6dee4d3..3b0d998f19 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
@@ -148,14 +148,41 @@ public class StandardStatelessFlowCurrent implements
StatelessFlowCurrent {
// If we weren't able to pull in any FlowFiles from outside the
Stateless Flow, go ahead and trigger the root connectables.
if (!flowFileSupplied) {
+ boolean allYielded = true;
for (final Connectable connectable : rootConnectables) {
if (!transactionThresholdsMet ||
connectable.hasIncomingConnection()) {
+ if (isYielded(connectable)) {
+ continue;
+ }
+
+ allYielded = false;
triggerRootConnectable(connectable);
}
}
+
+ // If all source processors are yielded, sleep for 10 milliseconds
before returning so that as this
+ // is called in a loop, we don't consume 100% CPU accomplishing
nothing.
+ if (allYielded) {
+ try {
+ Thread.sleep(10L);
+ } catch (final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
}
}
+ private boolean isYielded(final Connectable connectable) {
+ final long yieldExpiration = connectable.getYieldExpiration();
+
+ // If yield expiration <= 0, don't bother making system call to get
current time
+ if (yieldExpiration > 0) {
+ return yieldExpiration > System.currentTimeMillis();
+ }
+
+ return false;
+ }
+
private boolean triggerFlowFileSupplier() {
if (flowFileSupplier == null) {
return false;
@@ -247,7 +274,7 @@ public class StandardStatelessFlowCurrent implements
StatelessFlowCurrent {
return NextConnectable.NEXT_READY;
}
- // Check if we've reached out threshold for how much data we
are willing to bring into a single transaction. If so, we will not drop back to
+ // Check if we've reached our threshold for how much data we
are willing to bring into a single transaction. If so, we will not drop back to
// triggering source components
final boolean thresholdMet =
transactionThresholdMeter.isThresholdMet();
if (thresholdMet) {
diff --git
a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/HonorsSourceProcessorYieldIT.java
b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/HonorsSourceProcessorYieldIT.java
new file mode 100644
index 0000000000..c0eb47120c
--- /dev/null
+++
b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/HonorsSourceProcessorYieldIT.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.basics;
+
+import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.stateless.StatelessSystemIT;
+import org.apache.nifi.stateless.VersionedFlowBuilder;
+import org.apache.nifi.stateless.config.StatelessConfigurationException;
+import org.apache.nifi.stateless.flow.DataflowTrigger;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.flow.TransactionThresholds;
+import org.apache.nifi.stateless.flow.TriggerResult;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class HonorsSourceProcessorYieldIT extends StatelessSystemIT {
+
+ @Test
+ public void testSourceProcessorYieldIsHonored() throws
StatelessConfigurationException, IOException, InterruptedException {
+ final VersionedFlowBuilder builder = new VersionedFlowBuilder();
+ final VersionedProcessor source =
builder.createSimpleProcessor("YieldSource");
+ source.setProperties(Collections.singletonMap("Yield After", "5"));
+
+ final VersionedProcessor holdInput =
builder.createSimpleProcessor("HoldInput");
+ holdInput.setProperties(Collections.singletonMap("Hold Time", "2
secs"));
+ builder.createConnection(source, holdInput, "success");
+
+ final VersionedPort outputPort = builder.createOutputPort("Out");
+ builder.createConnection(holdInput, outputPort, "success");
+
+ final StatelessDataflow dataflow =
loadDataflow(builder.getFlowSnapshot(), Collections.emptyList(),
Collections.emptySet(), TransactionThresholds.UNLIMITED);
+ final DataflowTrigger trigger = dataflow.trigger();
+
+ // Ignore input for 3 seconds. During this time, the source processor
should produce 5 FlowFiles and then start yielding.
+ // As a result, we should get 5 FlowFiles quickly and then 1 per
second. This should result in 6-10 FlowFiles total,
+ // allowing for some variability in timing.
+ Thread.sleep(Duration.ofSeconds(3));
+
+ final Optional<TriggerResult> optionalResult = trigger.getResult(10,
TimeUnit.SECONDS);
+ assertTrue(optionalResult.isPresent());
+
+ final Map<String, List<FlowFile>> outputMap =
optionalResult.get().getOutputFlowFiles();
+ assertEquals(1, outputMap.size());
+
+ final List<FlowFile> flowFiles = outputMap.get("Out");
+ assertNotNull(flowFiles);
+ final int numFlowFiles = flowFiles.size();
+ assertTrue(numFlowFiles >= 6);
+ assertTrue(numFlowFiles <= 10);
+ }
+
+}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/HoldInput.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/HoldInput.java
new file mode 100644
index 0000000000..e7dedfb783
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/HoldInput.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+public class HoldInput extends AbstractSessionFactoryProcessor {
+
+ static final PropertyDescriptor IGNORE_TIME = new
PropertyDescriptor.Builder()
+ .name("Hold Time")
+ .description("The amount of time to hold input FlowFiles without
releasing them")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("3 sec")
+ .build();
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All FlowFiles go to this Relationship")
+ .build();
+
+ private volatile long lastTransferTime;
+
+ private final Set<FlowFileSession> flowFileSessions =
Collections.synchronizedSet(new HashSet<>());
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return List.of(IGNORE_TIME);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Set.of(REL_SUCCESS);
+ }
+
+ @OnScheduled
+ public void reset() {
+ lastTransferTime = System.nanoTime();
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSessionFactory sessionFactory) throws ProcessException {
+ final ProcessSession session = sessionFactory.createSession();
+ final List<FlowFile> flowFiles = session.get(10_000);
+ if (flowFiles.isEmpty()) {
+ return;
+ }
+
+ flowFileSessions.add(new FlowFileSession(session, flowFiles));
+
+ final long nextTransferTime = lastTransferTime +
context.getProperty(IGNORE_TIME).asTimePeriod(TimeUnit.NANOSECONDS);
+ if (System.nanoTime() < nextTransferTime) {
+ getLogger().debug("Ignoring input because {} is not yet reached",
context.getProperty(IGNORE_TIME).getValue());
+ return;
+ }
+
+ for (final FlowFileSession flowFileSession : flowFileSessions) {
+ final ProcessSession processSession = flowFileSession.session();
+ processSession.transfer(flowFileSession.flowFiles(), REL_SUCCESS);
+ processSession.commitAsync();
+ }
+
+ getLogger().info("After ignoring input for {}, successfully
transferred {} FlowFiles",
+ context.getProperty(IGNORE_TIME).getValue(), flowFiles.size());
+ lastTransferTime = System.nanoTime();
+ }
+
+ private record FlowFileSession(ProcessSession session, List<FlowFile>
flowFiles) {
+ }
+}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/YieldSource.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/YieldSource.java
new file mode 100644
index 0000000000..ff4185191a
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/YieldSource.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+public class YieldSource extends AbstractProcessor {
+ static final PropertyDescriptor YIELD_AFTER = new
PropertyDescriptor.Builder()
+ .name("Yield After")
+ .description("The number of times to run before yielding")
+ .required(true)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .defaultValue("1")
+ .build();
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All FlowFiles go to this Relationship")
+ .build();
+
+ private final AtomicLong flowFileCount = new AtomicLong(0L);
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return List.of(YIELD_AFTER);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Set.of(REL_SUCCESS);
+ }
+
+ @OnScheduled
+ public void reset() {
+ flowFileCount.set(0L);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ final long alreadyCreated = flowFileCount.getAndIncrement();
+ if (alreadyCreated >= context.getProperty(YIELD_AFTER).asLong()) {
+ context.yield();
+ getLogger().info("Yielding after {} FlowFiles", alreadyCreated);
+ } else {
+ getLogger().info("Created FlowFile {} without yielding",
alreadyCreated);
+ }
+
+ FlowFile flowFile = session.create();
+ session.transfer(flowFile, REL_SUCCESS);
+ }
+}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 1c68a41f36..91c2062c38 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -28,6 +28,7 @@
org.apache.nifi.processors.tests.system.EvaluatePropertiesWithDifferentELScopes
org.apache.nifi.processors.tests.system.FakeProcessor
org.apache.nifi.processors.tests.system.FakeDynamicPropertiesProcessor
org.apache.nifi.processors.tests.system.GenerateFlowFile
+org.apache.nifi.processors.tests.system.HoldInput
org.apache.nifi.processors.tests.system.IngestFile
org.apache.nifi.processors.tests.system.LoopFlowFile
org.apache.nifi.processors.tests.system.MigrateProperties
@@ -56,3 +57,4 @@ org.apache.nifi.processors.tests.system.VerifyEvenThenOdd
org.apache.nifi.processors.tests.system.WriteFlowFileCountToFile
org.apache.nifi.processors.tests.system.WriteLifecycleEvents
org.apache.nifi.processors.tests.system.WriteToFile
+org.apache.nifi.processors.tests.system.YieldSource