Added the sink's relationships to the relationship set. Added error checkign and logging for sink/source creation. Fixed an issue with transaction managemetn in the sink. Reformatted per coding standard.
Signed-off-by: Matt Gilman <matt.c.gil...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/cf29029a Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/cf29029a Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/cf29029a Branch: refs/heads/develop Commit: cf29029a4db4cc606b417889296abb97e460d586 Parents: b251ab4 Author: Joey Echeverria <j...@cloudera.com> Authored: Fri Jan 30 14:21:21 2015 -0500 Committer: Matt Gilman <matt.c.gil...@gmail.com> Committed: Tue Jul 14 14:50:16 2015 -0400 ---------------------------------------------------------------------- nifi/nifi-assembly/pom.xml | 3 + .../nifi-flume-bundle/nifi-flume-nar/pom.xml | 5 + .../processors/flume/FlumeSinkProcessor.java | 218 ++++++++-------- .../processors/flume/FlumeSourceProcessor.java | 253 ++++++++++--------- .../flume/FlumeSinkProcessorTest.java | 2 - nifi/pom.xml | 5 + 6 files changed, 257 insertions(+), 229 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cf29029a/nifi/nifi-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-assembly/pom.xml b/nifi/nifi-assembly/pom.xml index c679d22..4f4879f 100644 --- a/nifi/nifi-assembly/pom.xml +++ b/nifi/nifi-assembly/pom.xml @@ -165,6 +165,9 @@ language governing permissions and limitations under the License. --> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-kite-nar</artifactId> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-flume-nar</artifactId> <type>nar</type> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cf29029a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml index c5333b6..dff440e 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml @@ -27,5 +27,10 @@ <groupId>org.apache.nifi</groupId> <artifactId>nifi-flume-processors</artifactId> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-hadoop-libraries-nar</artifactId> + <type>nar</type> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cf29029a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java index 4603d18..fc97ae8 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java @@ -27,6 +27,10 @@ import org.apache.flume.Sink; import org.apache.flume.Transaction; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.conf.Configurables; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.Validator; @@ -36,122 +40,122 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.SchedulingContext; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.OnScheduled; -import org.apache.nifi.processor.annotation.OnUnscheduled; -import org.apache.nifi.processor.annotation.Tags; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.flume.util.FlowFileEvent; /** - * This processor runs a Flume sink + * This processor runs a Flume sink */ -@Tags({"flume", "hadoop", "get", "sink" }) +@Tags({"flume", "hadoop", "get", "sink"}) @CapabilityDescription("Generate FlowFile data from a Flume sink") public class FlumeSinkProcessor extends AbstractFlumeProcessor { - private Sink sink; - private MemoryChannel channel; - - public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder() - .name("Sink Type") - .description("The fully-qualified name of the Sink class") - .required(true) - .addValidator(createSinkValidator()) - .build(); - public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder() - .name("Agent Name") - .description("The name of the agent used in the Flume sink configuration") - .required(true) - .defaultValue("tier1") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder() - .name("Sink Name") - .description("The name of the sink used in the Flume sink configuration") - .required(true) - .defaultValue("sink-1") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder() - .name("Flume Configuration") - .description("The Flume configuration for the sink copied from the flume.properties file") - .required(true) - .defaultValue("") - .addValidator(Validator.VALID) - .build(); - - public static final Relationship SUCCESS = new Relationship.Builder().name("success").build(); - public static final Relationship FAILURE = new Relationship.Builder().name("failure").build(); - - private List<PropertyDescriptor> descriptors; - private Set<Relationship> relationships; - - - @Override - protected void init(final ProcessorInitializationContext context) { - this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG); - this.relationships = ImmutableSet.of(); - } - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return descriptors; - } - - @Override - public Set<Relationship> getRelationships() { - return relationships; - } - - @OnScheduled - public void onScheduled(final SchedulingContext context) { - channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); - channel.start(); - - sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(), - context.getProperty(SINK_TYPE).getValue()); - sink.setChannel(channel); - - String flumeConfig = context.getProperty(FLUME_CONFIG).getValue(); - String agentName = context.getProperty(AGENT_NAME).getValue(); - String sinkName = context.getProperty(SOURCE_NAME).getValue(); - Configurables.configure(sink, - getFlumeSinkContext(flumeConfig, agentName, sinkName) ); - - sink.start(); - } - - @OnUnscheduled - public void unScheduled() { - sink.stop(); - channel.stop(); - } - - @Override - public void onTrigger(final ProcessContext context, - final ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - - Transaction transaction = channel.getTransaction(); - try { - transaction.begin(); - channel.put(new FlowFileEvent(flowFile, session)); - transaction.commit(); - } catch (Throwable th) { - transaction.rollback(); - throw Throwables.propagate(th); - } finally { - transaction.close(); + private Sink sink; + private MemoryChannel channel; + + public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder() + .name("Sink Type") + .description("The fully-qualified name of the Sink class") + .required(true) + .addValidator(createSinkValidator()) + .build(); + public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder() + .name("Agent Name") + .description("The name of the agent used in the Flume sink configuration") + .required(true) + .defaultValue("tier1") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder() + .name("Sink Name") + .description("The name of the sink used in the Flume sink configuration") + .required(true) + .defaultValue("sink-1") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder() + .name("Flume Configuration") + .description("The Flume configuration for the sink copied from the flume.properties file") + .required(true) + .defaultValue("") + .addValidator(Validator.VALID) + .build(); + + public static final Relationship SUCCESS = new Relationship.Builder().name("success").build(); + public static final Relationship FAILURE = new Relationship.Builder().name("failure").build(); + + private List<PropertyDescriptor> descriptors; + private Set<Relationship> relationships; + + @Override + protected void init(final ProcessorInitializationContext context) { + this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG); + this.relationships = ImmutableSet.of(SUCCESS, FAILURE); } - try { - sink.process(); - session.transfer(flowFile, SUCCESS); - } catch (EventDeliveryException ex) { - session.transfer(flowFile, FAILURE); + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; } - } -} \ No newline at end of file + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @OnScheduled + public void onScheduled(final SchedulingContext context) { + try { + channel = new MemoryChannel(); + Configurables.configure(channel, new Context()); + channel.start(); + + sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(), + context.getProperty(SINK_TYPE).getValue()); + sink.setChannel(channel); + + String flumeConfig = context.getProperty(FLUME_CONFIG).getValue(); + String agentName = context.getProperty(AGENT_NAME).getValue(); + String sinkName = context.getProperty(SOURCE_NAME).getValue(); + Configurables.configure(sink, + getFlumeSinkContext(flumeConfig, agentName, sinkName)); + + sink.start(); + } catch (Throwable th) { + getLogger().error("Error creating sink", th); + throw Throwables.propagate(th); + } + } + + @OnUnscheduled + public void unScheduled() { + sink.stop(); + channel.stop(); + } + + @Override + public void onTrigger(final ProcessContext context, + final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + + Transaction transaction = channel.getTransaction(); + try { + transaction.begin(); + channel.put(new FlowFileEvent(flowFile, session)); + transaction.commit(); + } catch (Throwable th) { + transaction.rollback(); + throw Throwables.propagate(th); + } finally { + transaction.close(); + } + + try { + sink.process(); + session.transfer(flowFile, SUCCESS); + } catch (EventDeliveryException ex) { + session.transfer(flowFile, FAILURE); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cf29029a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java index 8b8388c..19551e6 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.flume; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import java.util.List; @@ -32,6 +33,10 @@ import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.conf.Configurables; import org.apache.flume.source.EventDrivenSourceRunner; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.Validator; @@ -40,139 +45,147 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.SchedulingContext; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.OnScheduled; -import org.apache.nifi.processor.annotation.OnUnscheduled; -import org.apache.nifi.processor.annotation.Tags; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; /** - * This processor runs a Flume source + * This processor runs a Flume source */ -@Tags({"flume", "hadoop", "get", "source" }) +@Tags({"flume", "hadoop", "get", "source"}) @CapabilityDescription("Generate FlowFile data from a Flume source") public class FlumeSourceProcessor extends AbstractFlumeProcessor { - - private Source source; - private SourceRunner runner; - private MemoryChannel channel; - - public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder() - .name("Source Type") - .description("The fully-qualified name of the Source class") - .required(true) - .addValidator(createSourceValidator()) - .build(); - public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder() - .name("Agent Name") - .description("The name of the agent used in the Flume source configuration") - .required(true) - .defaultValue("tier1") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder() - .name("Source Name") - .description("The name of the source used in the Flume source configuration") - .required(true) - .defaultValue("src-1") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder() - .name("Flume Configuration") - .description("The Flume configuration for the source copied from the flume.properties file") - .required(true) - .defaultValue("") - .addValidator(Validator.VALID) - .build(); - - public static final Relationship SUCCESS = new Relationship.Builder().name("success").build(); - - private List<PropertyDescriptor> descriptors; - private Set<Relationship> relationships; - - - @Override - protected void init(final ProcessorInitializationContext context) { - this.descriptors = ImmutableList.of(SOURCE_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG); - this.relationships = ImmutableSet.of(SUCCESS); - } - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return descriptors; - } - - @Override - public Set<Relationship> getRelationships() { - return relationships; - } - - @OnScheduled - public void onScheduled(final SchedulingContext context) { - source = SOURCE_FACTORY.create( - context.getProperty(SOURCE_NAME).getValue(), - context.getProperty(SOURCE_TYPE).getValue()); - - String flumeConfig = context.getProperty(FLUME_CONFIG).getValue(); - String agentName = context.getProperty(AGENT_NAME).getValue(); - String sourceName = context.getProperty(SOURCE_NAME).getValue(); - Configurables.configure(source, - getFlumeSourceContext(flumeConfig, agentName, sourceName) ); - - if (source instanceof EventDrivenSource) { - runner = new EventDrivenSourceRunner(); - channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); - channel.start(); - source.setChannelProcessor(new ChannelProcessor(new NifiChannelSelector(channel))); - runner.setSource(source); - runner.start(); - } - } - - @OnUnscheduled - public void unScheduled() { - if (runner != null) { - runner.stop(); + + private Source source; + private SourceRunner runner; + private MemoryChannel channel; + + public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder() + .name("Source Type") + .description("The fully-qualified name of the Source class") + .required(true) + .addValidator(createSourceValidator()) + .build(); + public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder() + .name("Agent Name") + .description("The name of the agent used in the Flume source configuration") + .required(true) + .defaultValue("tier1") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder() + .name("Source Name") + .description("The name of the source used in the Flume source configuration") + .required(true) + .defaultValue("src-1") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder() + .name("Flume Configuration") + .description("The Flume configuration for the source copied from the flume.properties file") + .required(true) + .defaultValue("") + .addValidator(Validator.VALID) + .build(); + + public static final Relationship SUCCESS = new Relationship.Builder().name("success").build(); + + private List<PropertyDescriptor> descriptors; + private Set<Relationship> relationships; + + @Override + protected void init(final ProcessorInitializationContext context) { + this.descriptors = ImmutableList.of(SOURCE_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG); + this.relationships = ImmutableSet.of(SUCCESS); } - if (channel != null) { - channel.stop(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; } - } - - @Override - public void onTrigger(final ProcessContext context, - final ProcessSession session) throws ProcessException { - if (source instanceof EventDrivenSource) { - onEventDrivenTrigger(context, session); - } else if (source instanceof PollableSource) { - onPollableTrigger((PollableSource)source, context, session); + + @Override + public Set<Relationship> getRelationships() { + return relationships; } - } - - public void onPollableTrigger(final PollableSource pollableSource, - final ProcessContext context, final ProcessSession session) - throws ProcessException { - try { - pollableSource.setChannelProcessor(new ChannelProcessor( - new NifiChannelSelector(new NifiChannel(session, SUCCESS)))); - pollableSource.start(); - pollableSource.process(); - pollableSource.stop(); - } catch (EventDeliveryException ex) { - throw new ProcessException("Error processing pollable source", ex); + + @OnScheduled + public void onScheduled(final SchedulingContext context) { + try { + source = SOURCE_FACTORY.create( + context.getProperty(SOURCE_NAME).getValue(), + context.getProperty(SOURCE_TYPE).getValue()); + + String flumeConfig = context.getProperty(FLUME_CONFIG).getValue(); + String agentName = context.getProperty(AGENT_NAME).getValue(); + String sourceName = context.getProperty(SOURCE_NAME).getValue(); + Configurables.configure(source, + getFlumeSourceContext(flumeConfig, agentName, sourceName)); + + if (source instanceof EventDrivenSource) { + runner = new EventDrivenSourceRunner(); + channel = new MemoryChannel(); + Configurables.configure(channel, new Context()); + channel.start(); + source.setChannelProcessor(new ChannelProcessor(new NifiChannelSelector(channel))); + runner.setSource(source); + runner.start(); + } + } catch (Throwable th) { + getLogger().error("Error creating source", th); + throw Throwables.propagate(th); + } } - } - public void onEventDrivenTrigger(final ProcessContext context, final ProcessSession session) { - Transaction transaction = channel.getTransaction(); - transaction.begin(); + @OnUnscheduled + public void unScheduled() { + if (runner != null) { + runner.stop(); + } + if (channel != null) { + channel.stop(); + } + } + + @Override + public void onTrigger(final ProcessContext context, + final ProcessSession session) throws ProcessException { + if (source instanceof EventDrivenSource) { + onEventDrivenTrigger(context, session); + } else if (source instanceof PollableSource) { + onPollableTrigger((PollableSource) source, context, session); + } + } + + public void onPollableTrigger(final PollableSource pollableSource, + final ProcessContext context, final ProcessSession session) + throws ProcessException { + try { + pollableSource.setChannelProcessor(new ChannelProcessor( + new NifiChannelSelector(new NifiChannel(session, SUCCESS)))); + pollableSource.start(); + pollableSource.process(); + pollableSource.stop(); + } catch (EventDeliveryException ex) { + throw new ProcessException("Error processing pollable source", ex); + } + } - Event event = channel.take(); - if (event != null) { - transferEvent(event, session, SUCCESS); + public void onEventDrivenTrigger(final ProcessContext context, final ProcessSession session) { + Transaction transaction = channel.getTransaction(); + transaction.begin(); + + try { + Event event = channel.take(); + if (event != null) { + transferEvent(event, session, SUCCESS); + } + transaction.commit(); + } catch (Throwable th) { + transaction.rollback(); + throw Throwables.propagate(th); + } finally { + transaction.close(); + } } - } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cf29029a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java index 4f2cef7..8d40cb6 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java @@ -18,7 +18,6 @@ package org.apache.nifi.processors.flume; import java.io.File; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import java.io.FileInputStream; import java.io.FilenameFilter; @@ -40,7 +39,6 @@ import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.file.FileUtils; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cf29029a/nifi/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/pom.xml b/nifi/pom.xml index 0c71ba8..422e1aa 100644 --- a/nifi/pom.xml +++ b/nifi/pom.xml @@ -805,6 +805,11 @@ <artifactId>nifi-geo-nar</artifactId> <version>0.2.0-incubating-SNAPSHOT</version> <type>nar</type> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-flume-nar</artifactId> + <version>0.0.1-incubating-SNAPSHOT</version> + <type>nar</type> </dependency> <dependency> <groupId>org.apache.nifi</groupId>