Fix poms, versions, add batching to sink processor

* Fix pom issues caused by the rebase.
* Update the Flume bundle's version to 0.1.0
* Add support for batching to the sink processor

Signed-off-by: Matt Gilman <matt.c.gil...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/3b9e4824
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/3b9e4824
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/3b9e4824

Branch: refs/heads/develop
Commit: 3b9e48246641e617cd3fef987ce6facd283f6f3e
Parents: cf29029
Author: Joey Echeverria <joe...@gmail.com>
Authored: Tue Apr 7 17:18:45 2015 -0700
Committer: Matt Gilman <matt.c.gil...@gmail.com>
Committed: Tue Jul 14 14:50:16 2015 -0400

----------------------------------------------------------------------
 nifi/nifi-assembly/pom.xml                      |  2 +
 .../nifi-flume-bundle/nifi-flume-nar/pom.xml    |  4 +-
 .../nifi-flume-processors/pom.xml               |  2 +-
 .../flume/AbstractFlumeProcessor.java           |  6 ---
 .../processors/flume/FlumeSinkProcessor.java    | 46 +++++++++++++++++---
 .../flume/FlumeSinkProcessorTest.java           | 13 ++++++
 nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml |  6 +--
 nifi/pom.xml                                    |  2 +-
 8 files changed, 61 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3b9e4824/nifi/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-assembly/pom.xml b/nifi/nifi-assembly/pom.xml
index 4f4879f..0293c70 100644
--- a/nifi/nifi-assembly/pom.xml
+++ b/nifi/nifi-assembly/pom.xml
@@ -165,6 +165,8 @@ language governing permissions and limitations under the 
License. -->
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-kite-nar</artifactId>
+            <type>nar</type>
+        </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-flume-nar</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3b9e4824/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml
index dff440e..36a5170 100644
--- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml
@@ -17,10 +17,10 @@
     <parent>
         <groupId>org.apache.nifi</groupId>
         <artifactId>nifi-flume-bundle</artifactId>
-        <version>0.0.1-incubating-SNAPSHOT</version>
+        <version>0.1.0-incubating-SNAPSHOT</version>
     </parent>
     <artifactId>nifi-flume-nar</artifactId>
-    <version>0.0.1-incubating-SNAPSHOT</version>
+    <version>0.1.0-incubating-SNAPSHOT</version>
     <packaging>nar</packaging>
     <dependencies>
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3b9e4824/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml
index 54636ca..bd26a99 100644
--- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml
@@ -17,7 +17,7 @@
     <parent>
         <groupId>org.apache.nifi</groupId>
         <artifactId>nifi-flume-bundle</artifactId>
-        <version>0.0.1-incubating-SNAPSHOT</version>
+        <version>0.1.0-incubating-SNAPSHOT</version>
     </parent>
     <artifactId>nifi-flume-processors</artifactId>
     <packaging>jar</packaging>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3b9e4824/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java
 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java
index 5c608d5..a831000 100644
--- 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java
+++ 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java
@@ -20,20 +20,15 @@ import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.StringReader;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.SinkFactory;
-import org.apache.flume.Source;
 import org.apache.flume.SourceFactory;
 import org.apache.flume.sink.DefaultSinkFactory;
 import org.apache.flume.source.DefaultSourceFactory;
 
-import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
@@ -42,7 +37,6 @@ import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.io.OutputStreamCallback;
-import static 
org.apache.nifi.processors.flume.FlumeSourceProcessor.FLUME_CONFIG;
 import org.apache.nifi.processors.flume.util.FlowFileEvent;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3b9e4824/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
index fc97ae8..0ffd4f1 100644
--- 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
+++ 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.flume;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
 import java.util.List;
 import java.util.Set;
 import org.apache.flume.Context;
@@ -27,6 +28,7 @@ import org.apache.flume.Sink;
 import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
+import org.apache.jasper.compiler.JspUtil;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@ -81,16 +83,24 @@ public class FlumeSinkProcessor extends 
AbstractFlumeProcessor {
             .defaultValue("")
             .addValidator(Validator.VALID)
             .build();
+    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Batch Size")
+            .description("The number of FlowFiles to process in a single 
batch")
+            .required(true)
+            .defaultValue("100")
+            .addValidator(StandardValidators.INTEGER_VALIDATOR)
+            .build();
 
     public static final Relationship SUCCESS = new 
Relationship.Builder().name("success").build();
     public static final Relationship FAILURE = new 
Relationship.Builder().name("failure").build();
 
     private List<PropertyDescriptor> descriptors;
     private Set<Relationship> relationships;
+    private int batchSize;
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
-        this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, 
SOURCE_NAME, FLUME_CONFIG);
+        this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, 
SOURCE_NAME, FLUME_CONFIG, BATCH_SIZE);
         this.relationships = ImmutableSet.of(SUCCESS, FAILURE);
     }
 
@@ -106,9 +116,14 @@ public class FlumeSinkProcessor extends 
AbstractFlumeProcessor {
 
     @OnScheduled
     public void onScheduled(final SchedulingContext context) {
+        batchSize = context.getProperty(BATCH_SIZE).asInteger();
+
         try {
             channel = new MemoryChannel();
-            Configurables.configure(channel, new Context());
+            Context memoryChannelContext = new Context();
+            memoryChannelContext.put("capacity", String.valueOf(batchSize*10));
+            memoryChannelContext.put("transactionCapacity", 
String.valueOf(batchSize*10));
+            Configurables.configure(channel, memoryChannelContext);
             channel.start();
 
             sink = 
SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(),
@@ -137,12 +152,22 @@ public class FlumeSinkProcessor extends 
AbstractFlumeProcessor {
     @Override
     public void onTrigger(final ProcessContext context,
             final ProcessSession session) throws ProcessException {
-        FlowFile flowFile = session.get();
+        List<FlowFile> flowFiles = 
Lists.newArrayListWithExpectedSize(batchSize);
+        for (int i = 0; i < batchSize; i++) {
+            FlowFile flowFile = session.get();
+            if (flowFile == null) {
+              break;
+            }
+
+            flowFiles.add(flowFile);
+        }
 
         Transaction transaction = channel.getTransaction();
         try {
             transaction.begin();
-            channel.put(new FlowFileEvent(flowFile, session));
+            for (FlowFile flowFile : flowFiles) {
+                channel.put(new FlowFileEvent(flowFile, session));
+            }
             transaction.commit();
         } catch (Throwable th) {
             transaction.rollback();
@@ -152,10 +177,17 @@ public class FlumeSinkProcessor extends 
AbstractFlumeProcessor {
         }
 
         try {
-            sink.process();
-            session.transfer(flowFile, SUCCESS);
+            Sink.Status status;
+            do {
+              status = sink.process();
+            } while(status == Sink.Status.READY);
+            for (FlowFile flowFile : flowFiles) {
+                session.transfer(flowFile, SUCCESS);
+            }
         } catch (EventDeliveryException ex) {
-            session.transfer(flowFile, FAILURE);
+            for (FlowFile flowFile : flowFiles) {
+                session.transfer(flowFile, FAILURE);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3b9e4824/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java
 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java
index 8d40cb6..d22514f 100644
--- 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java
+++ 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java
@@ -116,6 +116,19 @@ public class FlumeSinkProcessorTest {
         runner.run();
         fis.close();
     }
+
+    @Test
+    public void testBatchSize() throws IOException {
+        TestRunner runner = 
TestRunners.newTestRunner(FlumeSinkProcessor.class);
+        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, 
NullSink.class.getName());
+        runner.setProperty(FlumeSinkProcessor.BATCH_SIZE, "1000");
+        runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG,
+            "tier1.sinks.sink-1.batchSize = 1000\n");
+        for (int i = 0; i < 100000; i++) {
+          runner.enqueue(String.valueOf(i).getBytes());
+        }
+        runner.run();
+    }
     
     @Test
     public void testHdfsSink() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3b9e4824/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml
index dc9ec69..50b0fde 100644
--- a/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml
@@ -17,10 +17,10 @@
     <parent>
         <groupId>org.apache.nifi</groupId>
         <artifactId>nifi-nar-bundles</artifactId>
-        <version>0.0.1-incubating-SNAPSHOT</version>
+        <version>0.1.0-incubating-SNAPSHOT</version>
     </parent>
     <artifactId>nifi-flume-bundle</artifactId>
-    <version>0.0.1-incubating-SNAPSHOT</version>
+    <version>0.1.0-incubating-SNAPSHOT</version>
     <packaging>pom</packaging>
     <description>A bundle of processors that run Flume 
sources/sinks</description>
     <modules>
@@ -32,7 +32,7 @@
             <dependency>
                 <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-flume-processors</artifactId>
-                <version>0.0.1-incubating-SNAPSHOT</version>
+                <version>0.1.0-incubating-SNAPSHOT</version>
             </dependency>
         </dependencies>
     </dependencyManagement>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3b9e4824/nifi/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/pom.xml b/nifi/pom.xml
index 422e1aa..682a426 100644
--- a/nifi/pom.xml
+++ b/nifi/pom.xml
@@ -808,7 +808,7 @@
             <dependency>
                 <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-flume-nar</artifactId>
-                <version>0.0.1-incubating-SNAPSHOT</version>
+                <version>0.1.0-incubating-SNAPSHOT</version>
                 <type>nar</type>
             </dependency>
             <dependency>

Reply via email to