Repository: incubator-nifi
Updated Branches:
  refs/heads/NIFI-704 [created] a0e5c0e57


NIFI-704 StandardProcessorTestRunner should allow you to wait before calling 
OnUnScheduled methods


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a0e5c0e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a0e5c0e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a0e5c0e5

Branch: refs/heads/NIFI-704
Commit: a0e5c0e57f9378c4019d157dac56c3bdab287875
Parents: 33848b3
Author: danbress <dbr...@onyxconsults.com>
Authored: Sat Jun 20 19:48:02 2015 -0400
Committer: danbress <dbr...@onyxconsults.com>
Committed: Sat Jun 20 19:48:58 2015 -0400

----------------------------------------------------------------------
 .../nifi/util/StandardProcessorTestRunner.java  | 10 +++
 .../java/org/apache/nifi/util/TestRunner.java   | 45 +++++++++++++
 .../CurrentTestStandardProcessorTestRunner.java | 71 ++++++++++++++++++++
 3 files changed, 126 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a0e5c0e5/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
 
b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 655f2df..1505899 100644
--- 
a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ 
b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -39,6 +39,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -182,6 +183,11 @@ public class StandardProcessorTestRunner implements 
TestRunner {
 
     @Override
     public void run(final int iterations, final boolean stopOnFinish, final 
boolean initialize) {
+        run(iterations, stopOnFinish, initialize, 5000);
+    }
+
+    @Override
+    public void run(final int iterations, final boolean stopOnFinish, final 
boolean initialize, final long runWait) {
         if (iterations < 1) {
             throw new IllegalArgumentException();
         }
@@ -207,6 +213,10 @@ public class StandardProcessorTestRunner implements 
TestRunner {
             }
 
             executorService.shutdown();
+            try {
+                executorService.awaitTermination(runWait, TimeUnit.SECONDS);
+            } catch (InterruptedException e1) {
+            }
 
             int finishedCount = 0;
             boolean unscheduledRun = false;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a0e5c0e5/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java 
b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
index a599e5b..f0fbea8 100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -126,6 +126,51 @@ public interface TestRunner {
     void run(int iterations, boolean stopOnFinish, final boolean initialize);
 
     /**
+     * This method runs the {@link Processor} <code>iterations</code> times,
+     * using the sequence of steps below:
+     * <ul>
+     * <li>
+     * If {@code initialize} is true, run all methods on the Processor that are
+     * annotated with the
+     * {@link nifi.processor.annotation.OnScheduled @OnScheduled} annotation. 
If
+     * any of these methods throws an Exception, the Unit Test will fail.
+     * </li>
+     * <li>
+     * Schedule the
+     * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) 
onTrigger}
+     * method to be invoked <code>iterations</code> times. The number of 
threads
+     * used to run these iterations is determined by the ThreadCount of this
+     * <code>TestRunner</code>. By default, the value is set to 1, but it can 
be
+     * modified by calling the {@link #setThreadCount(int)} method.
+     * </li>
+     * <li>
+     * As soon as the first thread finishes its execution of
+     * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) 
onTrigger},
+     * all methods on the Processor that are annotated with the
+     * {@link nifi.processor.annotation.OnUnscheduled @OnUnscheduled} 
annotation
+     * are invoked. If any of these methods throws an Exception, the Unit Test
+     * will fail.
+     * </li>
+     * <li>
+     * Waits for all threads to finish execution.
+     * </li>
+     * <li>
+     * If and only if the value of <code>shutdown</code> is true: Call all
+     * methods on the Processor that is annotated with the
+     * {@link nifi.processor.annotation.OnStopped @OnStopped} annotation.
+     * </li>
+     * </ul>
+     *
+     * @param iterations number of iterations
+     * @param stopOnFinish whether or not to run the Processor methods that are
+     * annotated with {@link nifi.processor.annotation.OnStopped @OnStopped}
+     * @param initialize true if must initialize
+     * @param runWait indiciates the amount of time in milliseconds that the 
framework should wait for
+     * processors to stop running before calling the {@link 
nifi.processor.annotation.OnUnscheduled @OnUnscheduled} annotation
+     */
+    void run(int iterations, boolean stopOnFinish, final boolean initialize, 
final long runWait);
+
+    /**
      * Invokes all methods on the Processor that are annotated with the
      * {@link nifi.processor.annotation.OnShutdown @OnShutdown} annotation. If
      * any of these methods throws an Exception, the Unit Test will fail

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a0e5c0e5/nifi/nifi-mock/src/test/java/org/apache/nifi/util/CurrentTestStandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-mock/src/test/java/org/apache/nifi/util/CurrentTestStandardProcessorTestRunner.java
 
b/nifi/nifi-mock/src/test/java/org/apache/nifi/util/CurrentTestStandardProcessorTestRunner.java
new file mode 100644
index 0000000..450abd6
--- /dev/null
+++ 
b/nifi/nifi-mock/src/test/java/org/apache/nifi/util/CurrentTestStandardProcessorTestRunner.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CurrentTestStandardProcessorTestRunner {
+
+    /**
+     * This test will verify that all iterations of the run are finished 
before unscheduled is called
+     */
+    @Test
+    public void testOnScheduledCalledAfterRunFinished() {
+        SlowRunProcessor processor = new SlowRunProcessor();
+        StandardProcessorTestRunner runner = new 
StandardProcessorTestRunner(processor);
+        final int iterations = 5;
+        runner.run(iterations);
+        // if the counter is not equal to iterations, the the processor must 
have been unscheduled
+        // before all the run calls were made, that would be bad.
+        Assert.assertEquals(iterations, processor.getCounter());
+    }
+
+    /**
+     * This processor simulates a "slow" processor that checks whether it is 
scheduled before doing something
+     *
+     *
+     */
+    private static class SlowRunProcessor extends AbstractProcessor {
+
+        private int counter = 0;
+
+        @Override
+        public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+
+            try {
+                // be slow
+                Thread.sleep(50);
+                // make sure we are still scheduled
+                if (isScheduled()) {
+                    // increment counter
+                    ++counter;
+                }
+            } catch (InterruptedException e) {
+            }
+
+        }
+
+        public int getCounter() {
+            return counter;
+        }
+    }
+}

Reply via email to