Fix poms, versions, add batching to sink processor * Fix pom issues caused by the rebase. * Update the Flume bundle's version to 0.1.0 * Add support for batching to the sink processor
Signed-off-by: Matt Gilman <matt.c.gil...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/3b9e4824 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/3b9e4824 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/3b9e4824 Branch: refs/heads/develop Commit: 3b9e48246641e617cd3fef987ce6facd283f6f3e Parents: cf29029 Author: Joey Echeverria <joe...@gmail.com> Authored: Tue Apr 7 17:18:45 2015 -0700 Committer: Matt Gilman <matt.c.gil...@gmail.com> Committed: Tue Jul 14 14:50:16 2015 -0400 ---------------------------------------------------------------------- nifi/nifi-assembly/pom.xml | 2 + .../nifi-flume-bundle/nifi-flume-nar/pom.xml | 4 +- .../nifi-flume-processors/pom.xml | 2 +- .../flume/AbstractFlumeProcessor.java | 6 --- .../processors/flume/FlumeSinkProcessor.java | 46 +++++++++++++++++--- .../flume/FlumeSinkProcessorTest.java | 13 ++++++ nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml | 6 +-- nifi/pom.xml | 2 +- 8 files changed, 61 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3b9e4824/nifi/nifi-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-assembly/pom.xml b/nifi/nifi-assembly/pom.xml index 4f4879f..0293c70 100644 --- a/nifi/nifi-assembly/pom.xml +++ b/nifi/nifi-assembly/pom.xml @@ -165,6 +165,8 @@ language governing permissions and limitations under the License. --> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-kite-nar</artifactId> + <type>nar</type> + </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-flume-nar</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3b9e4824/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml index dff440e..36a5170 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml @@ -17,10 +17,10 @@ <parent> <groupId>org.apache.nifi</groupId> <artifactId>nifi-flume-bundle</artifactId> - <version>0.0.1-incubating-SNAPSHOT</version> + <version>0.1.0-incubating-SNAPSHOT</version> </parent> <artifactId>nifi-flume-nar</artifactId> - <version>0.0.1-incubating-SNAPSHOT</version> + <version>0.1.0-incubating-SNAPSHOT</version> <packaging>nar</packaging> <dependencies> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3b9e4824/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml index 54636ca..bd26a99 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml @@ -17,7 +17,7 @@ <parent> <groupId>org.apache.nifi</groupId> <artifactId>nifi-flume-bundle</artifactId> - <version>0.0.1-incubating-SNAPSHOT</version> + <version>0.1.0-incubating-SNAPSHOT</version> </parent> <artifactId>nifi-flume-processors</artifactId> <packaging>jar</packaging> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3b9e4824/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java index 5c608d5..a831000 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java @@ -20,20 +20,15 @@ import com.google.common.collect.Maps; import java.io.IOException; import java.io.OutputStream; import java.io.StringReader; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.SinkFactory; -import org.apache.flume.Source; import org.apache.flume.SourceFactory; import org.apache.flume.sink.DefaultSinkFactory; import org.apache.flume.source.DefaultSourceFactory; -import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; @@ -42,7 +37,6 @@ import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.io.OutputStreamCallback; -import static org.apache.nifi.processors.flume.FlumeSourceProcessor.FLUME_CONFIG; import org.apache.nifi.processors.flume.util.FlowFileEvent; /** http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3b9e4824/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java index fc97ae8..0ffd4f1 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java @@ -19,6 +19,7 @@ package org.apache.nifi.processors.flume; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; import java.util.List; import java.util.Set; import org.apache.flume.Context; @@ -27,6 +28,7 @@ import org.apache.flume.Sink; import org.apache.flume.Transaction; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.conf.Configurables; +import org.apache.jasper.compiler.JspUtil; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -81,16 +83,24 @@ public class FlumeSinkProcessor extends AbstractFlumeProcessor { .defaultValue("") .addValidator(Validator.VALID) .build(); + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The number of FlowFiles to process in a single batch") + .required(true) + .defaultValue("100") + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .build(); public static final Relationship SUCCESS = new Relationship.Builder().name("success").build(); public static final Relationship FAILURE = new Relationship.Builder().name("failure").build(); private List<PropertyDescriptor> descriptors; private Set<Relationship> relationships; + private int batchSize; @Override protected void init(final ProcessorInitializationContext context) { - this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG); + this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG, BATCH_SIZE); this.relationships = ImmutableSet.of(SUCCESS, FAILURE); } @@ -106,9 +116,14 @@ public class FlumeSinkProcessor extends AbstractFlumeProcessor { @OnScheduled public void onScheduled(final SchedulingContext context) { + batchSize = context.getProperty(BATCH_SIZE).asInteger(); + try { channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); + Context memoryChannelContext = new Context(); + memoryChannelContext.put("capacity", String.valueOf(batchSize*10)); + memoryChannelContext.put("transactionCapacity", String.valueOf(batchSize*10)); + Configurables.configure(channel, memoryChannelContext); channel.start(); sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(), @@ -137,12 +152,22 @@ public class FlumeSinkProcessor extends AbstractFlumeProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); + List<FlowFile> flowFiles = Lists.newArrayListWithExpectedSize(batchSize); + for (int i = 0; i < batchSize; i++) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + break; + } + + flowFiles.add(flowFile); + } Transaction transaction = channel.getTransaction(); try { transaction.begin(); - channel.put(new FlowFileEvent(flowFile, session)); + for (FlowFile flowFile : flowFiles) { + channel.put(new FlowFileEvent(flowFile, session)); + } transaction.commit(); } catch (Throwable th) { transaction.rollback(); @@ -152,10 +177,17 @@ public class FlumeSinkProcessor extends AbstractFlumeProcessor { } try { - sink.process(); - session.transfer(flowFile, SUCCESS); + Sink.Status status; + do { + status = sink.process(); + } while(status == Sink.Status.READY); + for (FlowFile flowFile : flowFiles) { + session.transfer(flowFile, SUCCESS); + } } catch (EventDeliveryException ex) { - session.transfer(flowFile, FAILURE); + for (FlowFile flowFile : flowFiles) { + session.transfer(flowFile, FAILURE); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3b9e4824/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java index 8d40cb6..d22514f 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java @@ -116,6 +116,19 @@ public class FlumeSinkProcessorTest { runner.run(); fis.close(); } + + @Test + public void testBatchSize() throws IOException { + TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class); + runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName()); + runner.setProperty(FlumeSinkProcessor.BATCH_SIZE, "1000"); + runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG, + "tier1.sinks.sink-1.batchSize = 1000\n"); + for (int i = 0; i < 100000; i++) { + runner.enqueue(String.valueOf(i).getBytes()); + } + runner.run(); + } @Test public void testHdfsSink() throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3b9e4824/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml b/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml index dc9ec69..50b0fde 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml @@ -17,10 +17,10 @@ <parent> <groupId>org.apache.nifi</groupId> <artifactId>nifi-nar-bundles</artifactId> - <version>0.0.1-incubating-SNAPSHOT</version> + <version>0.1.0-incubating-SNAPSHOT</version> </parent> <artifactId>nifi-flume-bundle</artifactId> - <version>0.0.1-incubating-SNAPSHOT</version> + <version>0.1.0-incubating-SNAPSHOT</version> <packaging>pom</packaging> <description>A bundle of processors that run Flume sources/sinks</description> <modules> @@ -32,7 +32,7 @@ <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-flume-processors</artifactId> - <version>0.0.1-incubating-SNAPSHOT</version> + <version>0.1.0-incubating-SNAPSHOT</version> </dependency> </dependencies> </dependencyManagement> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3b9e4824/nifi/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/pom.xml b/nifi/pom.xml index 422e1aa..682a426 100644 --- a/nifi/pom.xml +++ b/nifi/pom.xml @@ -808,7 +808,7 @@ <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-flume-nar</artifactId> - <version>0.0.1-incubating-SNAPSHOT</version> + <version>0.1.0-incubating-SNAPSHOT</version> <type>nar</type> </dependency> <dependency>