Repository: incubator-nifi
Updated Branches:
  refs/heads/develop 483b3dddf -> 6f3039c8c


Moved the source/sink stoppign to @OnStopped methods

* Made the spoolDirectory test stronger

Signed-off-by: Matt Gilman <matt.c.gil...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/c4dd1e65
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/c4dd1e65
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/c4dd1e65

Branch: refs/heads/develop
Commit: c4dd1e65b1be905dc712f1907ff1156e60d02a79
Parents: 3af73c9
Author: Joey Echeverria <joe...@gmail.com>
Authored: Wed Jun 10 14:18:42 2015 -0700
Committer: Matt Gilman <matt.c.gil...@gmail.com>
Committed: Tue Jul 14 14:50:16 2015 -0400

----------------------------------------------------------------------
 .../processors/flume/FlumeSinkProcessor.java    |  9 ++++---
 .../processors/flume/FlumeSourceProcessor.java  | 25 ++++++--------------
 .../flume/FlumeSourceProcessorTest.java         | 24 +++++++++++++++----
 3 files changed, 31 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c4dd1e65/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
index e385921..9ec1b07 100644
--- 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
+++ 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
@@ -28,7 +28,7 @@ import org.apache.flume.conf.Configurables;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.Validator;
@@ -124,15 +124,14 @@ public class FlumeSinkProcessor extends 
AbstractFlumeProcessor {
         }
     }
 
-    @OnUnscheduled
-    public void unScheduled() {
+    @OnStopped
+    public void stopped() {
         sink.stop();
         channel.stop();
     }
 
     @Override
-    public void onTrigger(final ProcessContext context,
-            final ProcessSession session) throws ProcessException {
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
 
         channel.setSession(session);
         try {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c4dd1e65/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java
 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java
index 3ded208..1ebf05c 100644
--- 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java
+++ 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java
@@ -33,7 +33,6 @@ import 
org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.Validator;
@@ -93,7 +92,6 @@ public class FlumeSourceProcessor extends 
AbstractFlumeProcessor {
     private final AtomicReference<ProcessSessionFactory> sessionFactoryRef = 
new AtomicReference<>(null);
     private final AtomicReference<EventDrivenSourceRunner> runnerRef = new 
AtomicReference<>(null);
     private final AtomicReference<NifiSessionFactoryChannel> 
eventDrivenSourceChannelRef = new AtomicReference<>(null);
-    private final AtomicReference<Boolean> stopping = new 
AtomicReference<>(false);
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
@@ -114,7 +112,6 @@ public class FlumeSourceProcessor extends 
AbstractFlumeProcessor {
     @OnScheduled
     public void onScheduled(final SchedulingContext context) {
         try {
-            stopping.set(false);
             source = SOURCE_FACTORY.create(
                 context.getProperty(SOURCE_NAME)
                 .getValue(),
@@ -142,9 +139,8 @@ public class FlumeSourceProcessor extends 
AbstractFlumeProcessor {
         }
     }
 
-    @OnUnscheduled
-    public void unScheduled() {
-        stopping.set(true);
+    @OnStopped
+    public void stopped() {
         if (source instanceof PollableSource) {
             source.stop();
         } else {
@@ -160,10 +156,6 @@ public class FlumeSourceProcessor extends 
AbstractFlumeProcessor {
                 
eventDrivenSourceChannelRef.compareAndSet(eventDrivenSourceChannel, null);
             }
         }
-    }
-
-    @OnStopped
-    public void stopped() {
         sessionFactoryRef.set(null);
     }
 
@@ -176,14 +168,11 @@ public class FlumeSourceProcessor extends 
AbstractFlumeProcessor {
             if (old == null) {
                 runnerRef.set(new EventDrivenSourceRunner());
                 eventDrivenSourceChannelRef.set(new 
NifiSessionFactoryChannel(sessionFactoryRef.get(), SUCCESS));
-                eventDrivenSourceChannelRef.get()
-                    .start();
-                source.setChannelProcessor(new ChannelProcessor(new 
NifiChannelSelector(
-                    eventDrivenSourceChannelRef.get())));
-                runnerRef.get()
-                    .setSource(source);
-                runnerRef.get()
-                    .start();
+                eventDrivenSourceChannelRef.get().start();
+                source.setChannelProcessor(new ChannelProcessor(
+                    new 
NifiChannelSelector(eventDrivenSourceChannelRef.get())));
+                runnerRef.get().setSource(source);
+                runnerRef.get().start();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c4dd1e65/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java
 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java
index 32feb1e..bf32095 100644
--- 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java
+++ 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.apache.flume.sink.NullSink;
 import org.apache.flume.source.AvroSource;
 
@@ -126,9 +127,24 @@ public class FlumeSourceProcessorTest {
         runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, "spooldir");
         runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG,
             "tier1.sources.src-1.spoolDir = " + 
spoolDirectory.getAbsolutePath());
-        runner.run();
-        // No data will be transfered because of how quickly the test runner
-        // starts shutting down
-        runner.assertTransferCount(FlumeSourceProcessor.SUCCESS, 0);
+        runner.run(1, false, true);
+        // Because the spool directory source is an event driven source, it 
may take some time for flow files to get
+        // produced. I'm willing to wait up to 5 seconds, but will bail out 
early if possible. If it takes longer than
+        // that then there is likely a bug.
+        int numWaits = 10;
+        while 
(runner.getFlowFilesForRelationship(FlumeSourceProcessor.SUCCESS).size() < 4 && 
--numWaits > 0) {
+            try {
+                TimeUnit.MILLISECONDS.sleep(500);
+            } catch (InterruptedException ex) {
+                logger.warn("Sleep interrupted");
+            }
+        }
+        runner.shutdown();
+        runner.assertTransferCount(FlumeSourceProcessor.SUCCESS, 4);
+        int i = 1;
+        for (MockFlowFile flowFile : 
runner.getFlowFilesForRelationship(FlumeSourceProcessor.SUCCESS)) {
+            flowFile.assertContentEquals("record " + i);
+            i++;
+        }
     }
 }

Reply via email to