Repository: incubator-nifi Updated Branches: refs/heads/NIFI-704 a0e5c0e57 -> 54a49bb40 (forced update)
NIFI-704 StandardProcessorTestRunner should allow you to wait before calling OnUnScheduled methods Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/54a49bb4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/54a49bb4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/54a49bb4 Branch: refs/heads/NIFI-704 Commit: 54a49bb40834d78305f07c859abba50371a41bb6 Parents: 33848b3 Author: danbress <dbr...@onyxconsults.com> Authored: Sat Jun 20 19:53:13 2015 -0400 Committer: danbress <dbr...@onyxconsults.com> Committed: Sat Jun 20 19:53:13 2015 -0400 ---------------------------------------------------------------------- .../nifi/util/StandardProcessorTestRunner.java | 10 +++ .../java/org/apache/nifi/util/TestRunner.java | 45 +++++++++++++ .../CurrentTestStandardProcessorTestRunner.java | 71 ++++++++++++++++++++ 3 files changed, 126 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54a49bb4/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 655f2df..8938547 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -39,6 +39,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -182,6 +183,11 @@ public class StandardProcessorTestRunner implements TestRunner { @Override public void run(final int iterations, final boolean stopOnFinish, final boolean initialize) { + run(iterations, stopOnFinish, initialize, 5000); + } + + @Override + public void run(final int iterations, final boolean stopOnFinish, final boolean initialize, final long runWait) { if (iterations < 1) { throw new IllegalArgumentException(); } @@ -207,6 +213,10 @@ public class StandardProcessorTestRunner implements TestRunner { } executorService.shutdown(); + try { + executorService.awaitTermination(runWait, TimeUnit.MILLISECONDS); + } catch (InterruptedException e1) { + } int finishedCount = 0; boolean unscheduledRun = false; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54a49bb4/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java index a599e5b..fb9fc78 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java @@ -126,6 +126,51 @@ public interface TestRunner { void run(int iterations, boolean stopOnFinish, final boolean initialize); /** + * This method runs the {@link Processor} <code>iterations</code> times, + * using the sequence of steps below: + * <ul> + * <li> + * If {@code initialize} is true, run all methods on the Processor that are + * annotated with the + * {@link nifi.processor.annotation.OnScheduled @OnScheduled} annotation. If + * any of these methods throws an Exception, the Unit Test will fail. + * </li> + * <li> + * Schedule the + * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) onTrigger} + * method to be invoked <code>iterations</code> times. The number of threads + * used to run these iterations is determined by the ThreadCount of this + * <code>TestRunner</code>. By default, the value is set to 1, but it can be + * modified by calling the {@link #setThreadCount(int)} method. + * </li> + * <li> + * As soon as the first thread finishes its execution of + * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) onTrigger}, + * all methods on the Processor that are annotated with the + * {@link nifi.processor.annotation.OnUnscheduled @OnUnscheduled} annotation + * are invoked. If any of these methods throws an Exception, the Unit Test + * will fail. + * </li> + * <li> + * Waits for all threads to finish execution. + * </li> + * <li> + * If and only if the value of <code>shutdown</code> is true: Call all + * methods on the Processor that is annotated with the + * {@link nifi.processor.annotation.OnStopped @OnStopped} annotation. + * </li> + * </ul> + * + * @param iterations number of iterations + * @param stopOnFinish whether or not to run the Processor methods that are + * annotated with {@link nifi.processor.annotation.OnStopped @OnStopped} + * @param initialize true if must initialize + * @param runWait indicates the amount of time in milliseconds that the framework should wait for + * processors to stop running before calling the {@link nifi.processor.annotation.OnUnscheduled @OnUnscheduled} annotation + */ + void run(int iterations, boolean stopOnFinish, final boolean initialize, final long runWait); + + /** * Invokes all methods on the Processor that are annotated with the * {@link nifi.processor.annotation.OnShutdown @OnShutdown} annotation. If * any of these methods throws an Exception, the Unit Test will fail http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54a49bb4/nifi/nifi-mock/src/test/java/org/apache/nifi/util/CurrentTestStandardProcessorTestRunner.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/test/java/org/apache/nifi/util/CurrentTestStandardProcessorTestRunner.java b/nifi/nifi-mock/src/test/java/org/apache/nifi/util/CurrentTestStandardProcessorTestRunner.java new file mode 100644 index 0000000..450abd6 --- /dev/null +++ b/nifi/nifi-mock/src/test/java/org/apache/nifi/util/CurrentTestStandardProcessorTestRunner.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.junit.Assert; +import org.junit.Test; + +public class CurrentTestStandardProcessorTestRunner { + + /** + * This test will verify that all iterations of the run are finished before unscheduled is called + */ + @Test + public void testOnScheduledCalledAfterRunFinished() { + SlowRunProcessor processor = new SlowRunProcessor(); + StandardProcessorTestRunner runner = new StandardProcessorTestRunner(processor); + final int iterations = 5; + runner.run(iterations); + // if the counter is not equal to iterations, the the processor must have been unscheduled + // before all the run calls were made, that would be bad. + Assert.assertEquals(iterations, processor.getCounter()); + } + + /** + * This processor simulates a "slow" processor that checks whether it is scheduled before doing something + * + * + */ + private static class SlowRunProcessor extends AbstractProcessor { + + private int counter = 0; + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + + try { + // be slow + Thread.sleep(50); + // make sure we are still scheduled + if (isScheduled()) { + // increment counter + ++counter; + } + } catch (InterruptedException e) { + } + + } + + public int getCounter() { + return counter; + } + } +}