This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new b5dd354 NIFI-9293: Ensure that we properly set the scheduled flag in the LifecycleState when stopping processors. Added system test to verify that @OnScheduled, onTrigger, @OnUnscheduled, @OnStopped are all called and in the expected order b5dd354 is described below commit b5dd35431e2251802bd0c13c5de016477ede49c5 Author: Mark Payne <marka...@hotmail.com> AuthorDate: Mon Jan 24 11:42:03 2022 -0500 NIFI-9293: Ensure that we properly set the scheduled flag in the LifecycleState when stopping processors. Added system test to verify that @OnScheduled, onTrigger, @OnUnscheduled, @OnStopped are all called and in the expected order This closes #5706. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- .../scheduling/StatelessProcessScheduler.java | 4 +- .../apache/nifi/stateless/StatelessSystemIT.java | 2 +- .../stateless/basics/ProcessorLifecycleIT.java | 78 +++++++++++++++ .../tests/system/WriteLifecycleEvents.java | 106 +++++++++++++++++++++ .../services/org.apache.nifi.processor.Processor | 3 +- 5 files changed, 190 insertions(+), 3 deletions(-) diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java index 07b2081..c129249 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java @@ -27,6 +27,7 @@ import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReportingTaskNode; +import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.SchedulingAgentCallback; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; @@ -149,7 +150,8 @@ public class StatelessProcessScheduler implements ProcessScheduler { logger.info("Stopping {}", procNode); final ProcessContext processContext = processContextFactory.createProcessContext(procNode); final LifecycleState lifecycleState = new LifecycleState(); - lifecycleState.setScheduled(false); + final boolean scheduled = procNode.getScheduledState() == ScheduledState.RUNNING || procNode.getActiveThreadCount() > 0; + lifecycleState.setScheduled(scheduled); return procNode.stop(this, this.componentLifeCycleThreadPool, processContext, schedulingAgent, lifecycleState); } diff --git a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java index 828f56e..697dd05 100644 --- a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java +++ b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java @@ -62,7 +62,7 @@ public class StatelessSystemIT { public TestName name = new TestName(); @Rule - public Timeout defaultTimeout = new Timeout(30, TimeUnit.MINUTES); + public Timeout defaultTimeout = new Timeout(5, TimeUnit.MINUTES); @Before diff --git a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/ProcessorLifecycleIT.java b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/ProcessorLifecycleIT.java new file mode 100644 index 0000000..b2596f7 --- /dev/null +++ b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/ProcessorLifecycleIT.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.stateless.basics; + +import org.apache.nifi.flow.VersionedProcessor; +import org.apache.nifi.stateless.StatelessSystemIT; +import org.apache.nifi.stateless.VersionedFlowBuilder; +import org.apache.nifi.stateless.config.StatelessConfigurationException; +import org.apache.nifi.stateless.flow.DataflowTrigger; +import org.apache.nifi.stateless.flow.StatelessDataflow; +import org.apache.nifi.stateless.flow.TriggerResult; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ProcessorLifecycleIT extends StatelessSystemIT { + private static final Logger logger = LoggerFactory.getLogger(ProcessorLifecycleIT.class); + + @Test + public void testRunProcessorShutdown() throws StatelessConfigurationException, IOException, InterruptedException { + final File eventsFile = new File("target/events.txt"); + Files.deleteIfExists(eventsFile.toPath()); + + final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder(); + + final VersionedProcessor generate = flowBuilder.createSimpleProcessor("GenerateFlowFile"); + final VersionedProcessor writeLifecycleEvents = flowBuilder.createSimpleProcessor("WriteLifecycleEvents"); + + flowBuilder.createConnection(generate, writeLifecycleEvents, "success"); + + writeLifecycleEvents.setAutoTerminatedRelationships(Collections.singleton("success")); + writeLifecycleEvents.setProperties(Collections.singletonMap("Event File", eventsFile.getAbsolutePath())); + + final StatelessDataflow dataflow = loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList()); + + final DataflowTrigger trigger = dataflow.trigger(); + final TriggerResult result = trigger.getResult(); + result.acknowledge(); + + dataflow.shutdown(); + + List<String> events = Files.readAllLines(eventsFile.toPath()); + + // Because the processors may be stopped in the background, we want to wait until we receive the events that we expect. + while (events.size() < 4) { + logger.info("Expecting to find 4 events written to {} but currently found only {}; will wait 100 milliseconds and check again", eventsFile.getAbsolutePath(), events.size()); + + Thread.sleep(100L); + events = Files.readAllLines(eventsFile.toPath()); + } + + assertEquals(Arrays.asList("OnScheduled", "OnTrigger", "OnUnscheduled", "OnStopped"), events); + } +} diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/WriteLifecycleEvents.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/WriteLifecycleEvents.java new file mode 100644 index 0000000..064f829 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/WriteLifecycleEvents.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.tests.system; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class WriteLifecycleEvents extends AbstractProcessor { + static final PropertyDescriptor EVENT_FILE = new PropertyDescriptor.Builder() + .name("Event File") + .displayName("Event File") + .description("Specifies the file to write to that contains a line of text for each lifecycle event that occurs") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("target/CountLifecycleEvents.events") + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles go here") + .build(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return Collections.singletonList(EVENT_FILE); + } + + @Override + public Set<Relationship> getRelationships() { + return Collections.singleton(REL_SUCCESS); + } + + @OnScheduled + public void onScheduled(final ProcessContext context) throws IOException { + writeEvent(context, "OnScheduled"); + } + + @OnUnscheduled + public void onUnscheduled(final ProcessContext context) throws IOException { + writeEvent(context, "OnUnscheduled"); + } + + @OnStopped + public void onStopped(final ProcessContext context) throws IOException { + writeEvent(context, "OnStopped"); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + try { + writeEvent(context, "OnTrigger"); + } catch (IOException e) { + throw new ProcessException(e); + } + + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + session.transfer(flowFile, REL_SUCCESS); + } + + private void writeEvent(final ProcessContext context, final String event) throws IOException { + final File file = new File(context.getProperty(EVENT_FILE).getValue()); + + final byte[] eventBytes = (event + "\n").getBytes(StandardCharsets.UTF_8); + + try (final OutputStream fos = new FileOutputStream(file, true)) { + fos.write(eventBytes); + } + } +} diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 77143b1..b661c11 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -43,4 +43,5 @@ org.apache.nifi.processors.tests.system.UpdateContent org.apache.nifi.processors.tests.system.ValidateFileExists org.apache.nifi.processors.tests.system.VerifyContents org.apache.nifi.processors.tests.system.WriteFlowFileCountToFile -org.apache.nifi.processors.tests.system.WriteToFile \ No newline at end of file +org.apache.nifi.processors.tests.system.WriteLifecycleEvents +org.apache.nifi.processors.tests.system.WriteToFile