Added processors that can run Flume sources and Flume sinks.

Signed-off-by: Matt Gilman <matt.c.gil...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/b251ab44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/b251ab44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/b251ab44

Branch: refs/heads/develop
Commit: b251ab44258a840c5bacd973776b26dacbdbbbc2
Parents: 483b3dd
Author: Joey Echeverria <j...@cloudera.com>
Authored: Wed Jan 28 16:30:49 2015 -0800
Committer: Matt Gilman <matt.c.gil...@gmail.com>
Committed: Tue Jul 14 14:50:16 2015 -0400

----------------------------------------------------------------------
 .../nifi-flume-bundle/nifi-flume-nar/pom.xml    |  31 ++++
 .../nifi-flume-processors/pom.xml               | 126 +++++++++++++
 .../flume/AbstractFlumeProcessor.java           | 134 ++++++++++++++
 .../processors/flume/FlumeSinkProcessor.java    | 157 ++++++++++++++++
 .../processors/flume/FlumeSourceProcessor.java  | 178 +++++++++++++++++++
 .../nifi/processors/flume/NifiChannel.java      |  31 ++++
 .../processors/flume/NifiChannelSelector.java   |  55 ++++++
 .../nifi/processors/flume/NifiTransaction.java  |  40 +++++
 .../processors/flume/util/FlowFileEvent.java    | 114 ++++++++++++
 .../flume/util/FlowFileEventConstants.java      |  25 +++
 .../org.apache.nifi.processor.Processor         |  16 ++
 .../processors/flume/AbstractFlumeTest.java     |  35 ++++
 .../flume/FlumeSinkProcessorTest.java           | 154 ++++++++++++++++
 .../flume/FlumeSourceProcessorTest.java         | 140 +++++++++++++++
 .../src/test/resources/core-site-broken.xml     |  25 +++
 .../src/test/resources/core-site.xml            |  25 +++
 .../src/test/resources/testdata/records.txt     |   4 +
 nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml |  39 ++++
 nifi/nifi-nar-bundles/pom.xml                   |   1 +
 19 files changed, 1330 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml
new file mode 100644
index 0000000..c5333b6
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml
@@ -0,0 +1,31 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-flume-bundle</artifactId>
+        <version>0.0.1-incubating-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-flume-nar</artifactId>
+    <version>0.0.1-incubating-SNAPSHOT</version>
+    <packaging>nar</packaging>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-flume-processors</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml
new file mode 100644
index 0000000..54636ca
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml
@@ -0,0 +1,126 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-flume-bundle</artifactId>
+        <version>0.0.1-incubating-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-flume-processors</artifactId>
+    <packaging>jar</packaging>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-processor-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-flowfile-packager</artifactId>
+        </dependency>
+        <dependency> 
+            <groupId>org.apache.flume</groupId> 
+            <artifactId>flume-ng-sdk</artifactId> 
+            <version>1.5.2</version>
+        </dependency> 
+        <dependency> 
+            <groupId>org.apache.flume</groupId> 
+            <artifactId>flume-ng-core</artifactId> 
+            <version>1.5.2</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency> 
+
+        <!-- Flume Sources -->
+        <dependency> 
+            <groupId>org.apache.flume.flume-ng-sources</groupId> 
+            <artifactId>flume-twitter-source</artifactId> 
+            <version>1.5.2</version>
+        </dependency> 
+        <dependency> 
+            <groupId>org.apache.flume.flume-ng-sources</groupId> 
+            <artifactId>flume-jms-source</artifactId> 
+            <version>1.5.2</version>
+        </dependency> 
+        <dependency> 
+            <groupId>org.apache.flume.flume-ng-sources</groupId> 
+            <artifactId>flume-scribe-source</artifactId> 
+            <version>1.5.2</version>
+        </dependency> 
+
+        <!-- Flume Sinks -->
+        <dependency> 
+            <groupId>org.apache.flume.flume-ng-sinks</groupId> 
+            <artifactId>flume-hdfs-sink</artifactId> 
+            <version>1.5.2</version>
+        </dependency> 
+
+        <!-- HDFS sink dependencies -->
+        <dependency> 
+            <groupId>org.apache.hadoop</groupId> 
+            <artifactId>hadoop-common</artifactId> 
+            <scope>provided</scope> 
+        </dependency> 
+        <dependency> 
+            <groupId>org.apache.hadoop</groupId> 
+            <artifactId>hadoop-hdfs</artifactId> 
+            <scope>provided</scope> 
+        </dependency>
+        
+        <dependency> 
+            <groupId>org.apache.flume.flume-ng-sinks</groupId> 
+            <artifactId>flume-irc-sink</artifactId> 
+            <version>1.5.2</version>
+        </dependency> 
+        <dependency> 
+            <groupId>org.apache.flume.flume-ng-sinks</groupId> 
+            <artifactId>flume-ng-elasticsearch-sink</artifactId> 
+            <version>1.5.2</version>
+        </dependency> 
+        <dependency> 
+            <groupId>org.apache.flume.flume-ng-sinks</groupId> 
+            <artifactId>flume-ng-hbase-sink</artifactId> 
+            <version>1.5.2</version>
+        </dependency> 
+        <dependency> 
+            <groupId>org.apache.flume.flume-ng-sinks</groupId> 
+            <artifactId>flume-ng-morphline-solr-sink</artifactId> 
+            <version>1.5.2</version>
+        </dependency> 
+
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-simple</artifactId>
+          <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java
 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java
new file mode 100644
index 0000000..5c608d5
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.flume;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.SinkFactory;
+import org.apache.flume.Source;
+import org.apache.flume.SourceFactory;
+import org.apache.flume.sink.DefaultSinkFactory;
+import org.apache.flume.source.DefaultSourceFactory;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import static 
org.apache.nifi.processors.flume.FlumeSourceProcessor.FLUME_CONFIG;
+import org.apache.nifi.processors.flume.util.FlowFileEvent;
+
+/**
+ * This is a base class that is helpful when building processors interacting
+ * with Flume.
+ */
+public abstract class AbstractFlumeProcessor extends AbstractProcessor {
+  protected static final SourceFactory SOURCE_FACTORY = new 
DefaultSourceFactory();
+  protected static final SinkFactory SINK_FACTORY = new DefaultSinkFactory();
+
+  protected static Event flowFileToEvent(FlowFile flowFile, ProcessSession 
session) {
+    return new FlowFileEvent(flowFile, session);
+  }
+
+  protected static void transferEvent(final Event event, ProcessSession 
session,
+      Relationship relationship) {
+    FlowFile flowFile = session.create();
+    flowFile = session.putAllAttributes(flowFile, event.getHeaders());
+
+    flowFile = session.write(flowFile, new OutputStreamCallback() {
+      @Override
+      public void process(final OutputStream out) throws IOException {
+        out.write(event.getBody());
+      }
+    });
+
+    session.getProvenanceReporter().create(flowFile);
+    session.transfer(flowFile, relationship);
+  }
+
+  protected static Validator createSourceValidator() {
+    return new Validator() {
+      @Override
+      public ValidationResult validate(final String subject, final String 
value, final ValidationContext context) {
+        String reason = null;
+        try {
+          FlumeSourceProcessor.SOURCE_FACTORY.create("NiFi Source", value);
+        } catch (Exception ex) {
+          reason = ex.getLocalizedMessage();
+          reason = Character.toLowerCase(reason.charAt(0)) + 
reason.substring(1);
+        }
+        return new 
ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason
 == null).build();
+      }
+    };
+  }
+
+  protected static Validator createSinkValidator() {
+    return new Validator() {
+      @Override
+      public ValidationResult validate(final String subject, final String 
value, final ValidationContext context) {
+        String reason = null;
+        try {
+          FlumeSinkProcessor.SINK_FACTORY.create("NiFi Sink", value);
+        } catch (Exception ex) {
+          reason = ex.getLocalizedMessage();
+          reason = Character.toLowerCase(reason.charAt(0)) + 
reason.substring(1);
+        }
+        return new 
ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason
 == null).build();
+      }
+    };
+  }
+
+  protected static Context getFlumeContext(String flumeConfig, String prefix) {
+    Properties flumeProperties = new Properties();
+    if (flumeConfig != null) {
+      try {
+        flumeProperties.load(new StringReader(flumeConfig));
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+    Map<String, String> parameters = Maps.newHashMap();
+    for (String property : flumeProperties.stringPropertyNames()) {
+      parameters.put(property, flumeProperties.getProperty(property));
+    }
+    return new Context(new Context(parameters).getSubProperties(prefix));
+  }
+
+  protected static Context getFlumeSourceContext(String flumeConfig,
+      String agentName, String sourceName) {
+    return getFlumeContext(flumeConfig, agentName + ".sources." + sourceName + 
".");
+  }
+
+  protected static Context getFlumeSinkContext(String flumeConfig,
+      String agentName, String sinkName) {
+    return getFlumeContext(flumeConfig, agentName + ".sinks." + sinkName + 
".");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
new file mode 100644
index 0000000..4603d18
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.flume;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.flume.Context;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Sink;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.SchedulingContext;
+import org.apache.nifi.processor.annotation.CapabilityDescription;
+import org.apache.nifi.processor.annotation.OnScheduled;
+import org.apache.nifi.processor.annotation.OnUnscheduled;
+import org.apache.nifi.processor.annotation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.flume.util.FlowFileEvent;
+
+/**
+ * This processor runs a Flume sink 
+ */
+@Tags({"flume", "hadoop", "get", "sink" })
+@CapabilityDescription("Generate FlowFile data from a Flume sink")
+public class FlumeSinkProcessor extends AbstractFlumeProcessor {
+
+  private Sink sink;
+  private MemoryChannel channel;
+
+  public static final PropertyDescriptor SINK_TYPE = new 
PropertyDescriptor.Builder()
+      .name("Sink Type")
+      .description("The fully-qualified name of the Sink class")
+      .required(true)
+      .addValidator(createSinkValidator())
+      .build();
+  public static final PropertyDescriptor AGENT_NAME = new 
PropertyDescriptor.Builder()
+      .name("Agent Name")
+      .description("The name of the agent used in the Flume sink 
configuration")
+      .required(true)
+      .defaultValue("tier1")
+      .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+      .build();
+  public static final PropertyDescriptor SOURCE_NAME = new 
PropertyDescriptor.Builder()
+      .name("Sink Name")
+      .description("The name of the sink used in the Flume sink configuration")
+      .required(true)
+      .defaultValue("sink-1")
+      .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+      .build();
+  public static final PropertyDescriptor FLUME_CONFIG = new 
PropertyDescriptor.Builder()
+      .name("Flume Configuration")
+      .description("The Flume configuration for the sink copied from the 
flume.properties file")
+      .required(true)
+      .defaultValue("")
+      .addValidator(Validator.VALID)
+      .build();
+
+  public static final Relationship SUCCESS = new 
Relationship.Builder().name("success").build();
+  public static final Relationship FAILURE = new 
Relationship.Builder().name("failure").build();
+
+  private List<PropertyDescriptor> descriptors;
+  private Set<Relationship> relationships;
+
+
+  @Override
+  protected void init(final ProcessorInitializationContext context) {
+    this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, 
FLUME_CONFIG);
+    this.relationships = ImmutableSet.of();
+  }
+
+  @Override
+  protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+    return descriptors;
+  }
+
+  @Override
+  public Set<Relationship> getRelationships() {
+    return relationships;
+  }
+
+  @OnScheduled
+  public void onScheduled(final SchedulingContext context) {
+    channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+    channel.start();
+
+    sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(),
+        context.getProperty(SINK_TYPE).getValue());
+    sink.setChannel(channel);
+
+    String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
+    String agentName = context.getProperty(AGENT_NAME).getValue();
+    String sinkName = context.getProperty(SOURCE_NAME).getValue();
+    Configurables.configure(sink,
+        getFlumeSinkContext(flumeConfig, agentName, sinkName) );
+
+    sink.start();
+  }
+
+  @OnUnscheduled
+  public void unScheduled() {
+    sink.stop();
+    channel.stop();
+  }
+
+  @Override
+  public void onTrigger(final ProcessContext context,
+      final ProcessSession session) throws ProcessException {
+    FlowFile flowFile = session.get();
+
+    Transaction transaction = channel.getTransaction();
+    try {
+      transaction.begin();
+      channel.put(new FlowFileEvent(flowFile, session));
+      transaction.commit();
+    } catch (Throwable th) {
+      transaction.rollback();
+      throw Throwables.propagate(th);
+    } finally {
+      transaction.close();
+    }
+
+    try {
+      sink.process();
+      session.transfer(flowFile, SUCCESS);
+    } catch (EventDeliveryException ex) {
+      session.transfer(flowFile, FAILURE);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java
 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java
new file mode 100644
index 0000000..8b8388c
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.flume;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.PollableSource;
+import org.apache.flume.Source;
+import org.apache.flume.SourceRunner;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.source.EventDrivenSourceRunner;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.SchedulingContext;
+import org.apache.nifi.processor.annotation.CapabilityDescription;
+import org.apache.nifi.processor.annotation.OnScheduled;
+import org.apache.nifi.processor.annotation.OnUnscheduled;
+import org.apache.nifi.processor.annotation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+/**
+ * This processor runs a Flume source 
+ */
+@Tags({"flume", "hadoop", "get", "source" })
+@CapabilityDescription("Generate FlowFile data from a Flume source")
+public class FlumeSourceProcessor extends AbstractFlumeProcessor {
+  
+  private Source source;
+  private SourceRunner runner;
+  private MemoryChannel channel;
+
+  public static final PropertyDescriptor SOURCE_TYPE = new 
PropertyDescriptor.Builder()
+      .name("Source Type")
+      .description("The fully-qualified name of the Source class")
+      .required(true)
+      .addValidator(createSourceValidator())
+      .build();
+  public static final PropertyDescriptor AGENT_NAME = new 
PropertyDescriptor.Builder()
+      .name("Agent Name")
+      .description("The name of the agent used in the Flume source 
configuration")
+      .required(true)
+      .defaultValue("tier1")
+      .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+      .build();
+  public static final PropertyDescriptor SOURCE_NAME = new 
PropertyDescriptor.Builder()
+      .name("Source Name")
+      .description("The name of the source used in the Flume source 
configuration")
+      .required(true)
+      .defaultValue("src-1")
+      .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+      .build();
+  public static final PropertyDescriptor FLUME_CONFIG = new 
PropertyDescriptor.Builder()
+      .name("Flume Configuration")
+      .description("The Flume configuration for the source copied from the 
flume.properties file")
+      .required(true)
+      .defaultValue("")
+      .addValidator(Validator.VALID)
+      .build();
+
+  public static final Relationship SUCCESS = new 
Relationship.Builder().name("success").build();
+
+  private List<PropertyDescriptor> descriptors;
+  private Set<Relationship> relationships;
+
+
+  @Override
+  protected void init(final ProcessorInitializationContext context) {
+    this.descriptors = ImmutableList.of(SOURCE_TYPE, AGENT_NAME, SOURCE_NAME, 
FLUME_CONFIG);
+    this.relationships = ImmutableSet.of(SUCCESS);
+  }
+
+  @Override
+  protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+    return descriptors;
+  }
+
+  @Override
+  public Set<Relationship> getRelationships() {
+    return relationships;
+  }
+
+  @OnScheduled
+  public void onScheduled(final SchedulingContext context) {
+    source = SOURCE_FACTORY.create(
+        context.getProperty(SOURCE_NAME).getValue(),
+        context.getProperty(SOURCE_TYPE).getValue());
+    
+    String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
+    String agentName = context.getProperty(AGENT_NAME).getValue();
+    String sourceName = context.getProperty(SOURCE_NAME).getValue();
+    Configurables.configure(source,
+        getFlumeSourceContext(flumeConfig, agentName, sourceName) );
+
+    if (source instanceof EventDrivenSource) {
+      runner = new EventDrivenSourceRunner();
+      channel = new MemoryChannel();
+      Configurables.configure(channel, new Context());
+      channel.start();
+      source.setChannelProcessor(new ChannelProcessor(new 
NifiChannelSelector(channel)));
+      runner.setSource(source);
+      runner.start();
+    } 
+  }
+
+  @OnUnscheduled
+  public void unScheduled() {
+    if (runner != null) {
+      runner.stop();
+    }
+    if (channel != null) {
+      channel.stop();
+    }
+  }
+
+  @Override
+  public void onTrigger(final ProcessContext context,
+      final ProcessSession session) throws ProcessException {
+    if (source instanceof EventDrivenSource) {
+      onEventDrivenTrigger(context, session);
+    } else if (source instanceof PollableSource) {
+      onPollableTrigger((PollableSource)source, context, session);
+    }
+  }
+
+  public void onPollableTrigger(final PollableSource pollableSource,
+      final ProcessContext context, final ProcessSession session)
+      throws ProcessException {
+    try {
+      pollableSource.setChannelProcessor(new ChannelProcessor(
+          new NifiChannelSelector(new NifiChannel(session, SUCCESS))));
+      pollableSource.start();
+      pollableSource.process();
+      pollableSource.stop();
+    } catch (EventDeliveryException ex) {
+      throw new ProcessException("Error processing pollable source", ex);
+    }
+  }
+
+  public void onEventDrivenTrigger(final ProcessContext context, final 
ProcessSession session) {
+    Transaction transaction = channel.getTransaction();
+    transaction.begin();
+
+    Event event = channel.take();
+    if (event != null) {
+      transferEvent(event, session, SUCCESS);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannel.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannel.java
 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannel.java
new file mode 100644
index 0000000..ac8dbe2
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannel.java
@@ -0,0 +1,31 @@
+
+package org.apache.nifi.processors.flume;
+
+import org.apache.flume.Context;
+import org.apache.flume.channel.BasicChannelSemantics;
+import org.apache.flume.channel.BasicTransactionSemantics;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+
+
+public class NifiChannel extends BasicChannelSemantics {
+  private final ProcessSession session;
+  private final Relationship relationship;
+
+  public NifiChannel(ProcessSession session, Relationship relationship) {
+    this.session = session;
+    this.relationship = relationship;
+  }
+
+  @Override
+  protected BasicTransactionSemantics createTransaction() {
+    return new NifiTransaction(session, relationship);
+  }
+
+  @Override
+  public void configure(Context context) {
+    throw new UnsupportedOperationException("Not supported yet."); //To change 
body of generated methods, choose Tools | Templates.
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannelSelector.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannelSelector.java
 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannelSelector.java
new file mode 100644
index 0000000..792678b
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannelSelector.java
@@ -0,0 +1,55 @@
+
+package org.apache.nifi.processors.flume;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+
+
+public class NifiChannelSelector implements ChannelSelector {
+  private String name;
+  private final List<Channel> requiredChannels;
+  private final List<Channel> optionalChannels;
+
+  public NifiChannelSelector(Channel channel) {
+    requiredChannels = ImmutableList.of(channel);
+    optionalChannels = ImmutableList.of();
+  }
+
+  @Override
+  public List<Channel> getRequiredChannels(Event event) {
+    return requiredChannels;
+  }
+
+  @Override
+  public List<Channel> getOptionalChannels(Event event) {
+    return optionalChannels;
+  }
+
+  @Override
+  public List<Channel> getAllChannels() {
+    return requiredChannels;
+  }
+
+  @Override
+  public void setChannels(List<Channel> channels) {
+    throw new UnsupportedOperationException("Not supported yet."); //To change 
body of generated methods, choose Tools | Templates.
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  @Override
+  public void configure(Context context) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiTransaction.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiTransaction.java
 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiTransaction.java
new file mode 100644
index 0000000..3d6a647
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiTransaction.java
@@ -0,0 +1,40 @@
+
+package org.apache.nifi.processors.flume;
+
+import org.apache.flume.Event;
+import org.apache.flume.channel.BasicTransactionSemantics;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+
+
+class NifiTransaction extends BasicTransactionSemantics {
+  private final ProcessSession session;
+  private final Relationship relationship;
+
+  public NifiTransaction(ProcessSession session, Relationship relationship) {
+    this.session = session;
+    this.relationship = relationship;
+  }
+
+  @Override
+  protected void doPut(Event event) throws InterruptedException {
+    AbstractFlumeProcessor.transferEvent(event, session, relationship);
+  }
+
+  @Override
+  protected Event doTake() throws InterruptedException {
+    throw new UnsupportedOperationException("Only put supported");
+  }
+
+  @Override
+  protected void doCommit() throws InterruptedException {
+    session.commit();
+  }
+
+  @Override
+  protected void doRollback() throws InterruptedException {
+    session.rollback();
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java
 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java
new file mode 100644
index 0000000..c3531ca
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java
@@ -0,0 +1,114 @@
+
+package org.apache.nifi.processors.flume.util;
+
+import com.google.common.collect.Maps;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.flume.Event;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.InputStreamCallback;
+
+import static org.apache.nifi.processors.flume.util.FlowFileEventConstants.*;
+import org.apache.nifi.stream.io.BufferedInputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+
+public class FlowFileEvent implements Event {
+
+  private final FlowFile flowFile;
+  private final ProcessSession session;
+
+  private final Map<String, String> headers;
+  private boolean headersLoaded;
+
+  private final Object bodyLock;
+  private byte[] body;
+  private boolean bodyLoaded;
+
+  public FlowFileEvent(FlowFile flowFile, ProcessSession session) {
+    this.flowFile = flowFile;
+    this.session = session;
+
+    headers = Maps.newHashMap();
+    bodyLock = new Object();
+    bodyLoaded = false;
+  }
+
+  @Override
+  public Map<String, String> getHeaders() {
+    if (!headersLoaded) {
+      synchronized (headers) {
+        if (headersLoaded) {
+          return headers;
+        }
+
+        headers.putAll(flowFile.getAttributes());
+        headers.put(ENTRY_DATE_HEADER, Long.toString(flowFile.getEntryDate()));
+        headers.put(ID_HEADER, Long.toString(flowFile.getId()));
+        headers.put(LAST_QUEUE_DATE_HEADER, 
Long.toString(flowFile.getLastQueueDate()));
+        int i = 0;
+        for (String lineageIdentifier : flowFile.getLineageIdentifiers()) {
+          headers.put(LINEAGE_IDENTIFIERS_HEADER + "." + i, lineageIdentifier);
+          i++;
+        }
+        headers.put(LINEAGE_START_DATE_HEADER, 
Long.toString(flowFile.getLineageStartDate()));
+        headers.put(SIZE_HEADER, Long.toString(flowFile.getSize()));
+        
+        headersLoaded = true;
+      }
+    }
+    return headers;
+  }
+
+  @Override
+  public void setHeaders(Map<String, String> headers) {
+    synchronized (this.headers) {
+      this.headers.clear();
+      this.headers.putAll(headers);
+      headersLoaded = true;
+    }
+  }
+
+  @Override
+  public byte[] getBody() {
+    if (bodyLoaded) {
+      return body;
+    }
+
+    synchronized (bodyLock ) {
+      if (!bodyLoaded) {
+        if (flowFile.getSize() > Integer.MAX_VALUE) {
+          throw new RuntimeException("Can't get body of Event because the 
backing FlowFile is too large (" + flowFile.getSize() + " bytes)");
+        }
+    
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream((int) 
flowFile.getSize());
+        session.read(flowFile, new InputStreamCallback() {
+
+          @Override
+          public void process(InputStream in) throws IOException {
+            try (BufferedInputStream input = new BufferedInputStream(in)) {
+              StreamUtils.copy(in, baos);
+            }
+            baos.close();
+          }
+        });
+
+        body = baos.toByteArray();
+        bodyLoaded = true;
+      }
+    }
+
+    return body;
+  }
+
+  @Override
+  public void setBody(byte[] body) {
+    synchronized (bodyLock) {
+      this.body = Arrays.copyOf(body, body.length);
+      bodyLoaded = true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEventConstants.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEventConstants.java
 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEventConstants.java
new file mode 100644
index 0000000..c13f0ef
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEventConstants.java
@@ -0,0 +1,25 @@
+
+package org.apache.nifi.processors.flume.util;
+
+
+public class FlowFileEventConstants {
+
+  // FlowFile#getEntryDate();
+  public static final String ENTRY_DATE_HEADER = "nifi.entry.date";
+
+  // FlowFile#getId();
+  public static final String ID_HEADER = "nifi.id";
+
+  // FlowFile#getLastQueueDate();
+  public static final String LAST_QUEUE_DATE_HEADER = "nifi.last.queue.date";
+
+  // FlowFile#getLineageIdentifiers();
+  public static final String LINEAGE_IDENTIFIERS_HEADER = 
"nifi.lineage.identifiers";
+
+  // FlowFile#getLineageStartDate();
+  public static final String LINEAGE_START_DATE_HEADER = 
"nifi.lineage.start.date";
+
+  // FlowFile#getSize();
+  public static final String SIZE_HEADER = "nifi.size";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..fae8727
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.flume.FlumeSourceProcessor
+org.apache.nifi.processors.flume.FlumeSinkProcessor

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/AbstractFlumeTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/AbstractFlumeTest.java
 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/AbstractFlumeTest.java
new file mode 100644
index 0000000..87b056a
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/AbstractFlumeTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.flume;
+
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AbstractFlumeTest {
+
+    private static Logger logger;
+
+    @BeforeClass
+    public static void setUpClass() {
+        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
+        System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
+        System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.flume", 
"debug");
+        logger = LoggerFactory.getLogger(AbstractFlumeTest.class);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java
 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java
new file mode 100644
index 0000000..4f2cef7
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.flume;
+
+import java.io.File;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.FileInputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import org.apache.commons.io.filefilter.HiddenFileFilter;
+import org.apache.flume.sink.NullSink;
+import org.apache.flume.source.AvroSource;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.file.FileUtils;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlumeSinkProcessorTest {
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(FlumeSinkProcessorTest.class);
+  
+    @Test
+    public void testValidators() {
+        TestRunner runner = 
TestRunners.newTestRunner(FlumeSinkProcessor.class);
+        Collection<ValidationResult> results;
+        ProcessContext pc;
+
+        results = new HashSet<>();
+        runner.enqueue(new byte[0]);
+        pc = runner.getProcessContext();
+        if (pc instanceof MockProcessContext) {
+            results = ((MockProcessContext) pc).validate();
+        }
+        Assert.assertEquals(1, results.size());
+        for (ValidationResult vr : results) {
+            logger.error(vr.toString());
+            Assert.assertTrue(vr.toString().contains("is invalid because Sink 
Type is required"));
+        }
+
+        // non-existent class
+        results = new HashSet<>();
+        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, "invalid.class.name");
+        runner.enqueue(new byte[0]);
+        pc = runner.getProcessContext();
+        if (pc instanceof MockProcessContext) {
+            results = ((MockProcessContext) pc).validate();
+        }
+        Assert.assertEquals(1, results.size());
+        for (ValidationResult vr : results) {
+            logger.error(vr.toString());
+            Assert.assertTrue(vr.toString().contains("is invalid because 
unable to load sink"));
+        }
+
+        // class doesn't implement Sink
+        results = new HashSet<>();
+        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, 
AvroSource.class.getName());
+        runner.enqueue(new byte[0]);
+        pc = runner.getProcessContext();
+        if (pc instanceof MockProcessContext) {
+            results = ((MockProcessContext) pc).validate();
+        }
+        Assert.assertEquals(1, results.size());
+        for (ValidationResult vr : results) {
+            logger.error(vr.toString());
+            Assert.assertTrue(vr.toString().contains("is invalid because 
unable to create sink"));
+        }
+
+        results = new HashSet<>();
+        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, 
NullSink.class.getName());
+        runner.enqueue(new byte[0]);
+        pc = runner.getProcessContext();
+        if (pc instanceof MockProcessContext) {
+            results = ((MockProcessContext) pc).validate();
+        }
+        Assert.assertEquals(0, results.size());
+    }
+
+
+    @Test
+    public void testNullSink() throws IOException {
+        TestRunner runner = 
TestRunners.newTestRunner(FlumeSinkProcessor.class);
+        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, 
NullSink.class.getName());
+        FileInputStream fis = new 
FileInputStream("src/test/resources/testdata/records.txt");
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.FILENAME.key(), "records.txt");
+        runner.enqueue(fis, attributes);
+        runner.run();
+        fis.close();
+    }
+    
+    @Test
+    public void testHdfsSink() throws IOException {
+        File destDir = new File("target/hdfs");
+        if (destDir.exists()) {
+          FileUtils.deleteFilesInDir(destDir, null, logger);
+        } else {
+          destDir.mkdirs();
+        }
+
+        TestRunner runner = 
TestRunners.newTestRunner(FlumeSinkProcessor.class);
+        runner.setProperty(FlumeSinkProcessor.SINK_TYPE, "hdfs");
+        runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG,
+            "tier1.sinks.sink-1.hdfs.path = " + destDir.toURI().toString() + 
"\n" +
+            "tier1.sinks.sink-1.hdfs.fileType = DataStream\n" +
+            "tier1.sinks.sink-1.hdfs.serializer = TEXT\n" +
+            "tier1.sinks.sink-1.serializer.appendNewline = false"
+        );
+        FileInputStream fis = new 
FileInputStream("src/test/resources/testdata/records.txt");
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.FILENAME.key(), "records.txt");
+        runner.enqueue(fis, attributes);
+        runner.run();
+        fis.close();
+
+        File[] files = 
destDir.listFiles((FilenameFilter)HiddenFileFilter.VISIBLE);
+        assertEquals("Unexpected number of destination files.", 1, 
files.length);
+        File dst = files[0];
+        byte[] expectedMd5 = FileUtils.computeMd5Digest(new 
File("src/test/resources/testdata/records.txt"));
+        byte[] actualMd5 = FileUtils.computeMd5Digest(dst);
+        Assert.assertArrayEquals("Destination file doesn't match source data", 
expectedMd5, actualMd5);
+    }
+ 
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java
 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java
new file mode 100644
index 0000000..bbcf116
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.flume;
+
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.flume.sink.NullSink;
+import org.apache.flume.source.AvroSource;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.file.FileUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlumeSourceProcessorTest {
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(FlumeSourceProcessorTest.class);
+
+
+    @Test
+    public void testValidators() {
+        TestRunner runner = 
TestRunners.newTestRunner(FlumeSourceProcessor.class);
+        Collection<ValidationResult> results;
+        ProcessContext pc;
+
+        results = new HashSet<>();
+        runner.enqueue(new byte[0]);
+        pc = runner.getProcessContext();
+        if (pc instanceof MockProcessContext) {
+            results = ((MockProcessContext) pc).validate();
+        }
+        Assert.assertEquals(1, results.size());
+        for (ValidationResult vr : results) {
+            logger.error(vr.toString());
+            Assert.assertTrue(vr.toString().contains("is invalid because 
Source Type is required"));
+        }
+
+        // non-existent class
+        results = new HashSet<>();
+        runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, 
"invalid.class.name");
+        runner.enqueue(new byte[0]);
+        pc = runner.getProcessContext();
+        if (pc instanceof MockProcessContext) {
+            results = ((MockProcessContext) pc).validate();
+        }
+        Assert.assertEquals(1, results.size());
+        for (ValidationResult vr : results) {
+            logger.error(vr.toString());
+            Assert.assertTrue(vr.toString().contains("is invalid because 
unable to load source"));
+        }
+
+        // class doesn't implement Source
+        results = new HashSet<>();
+        runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, 
NullSink.class.getName());
+        runner.enqueue(new byte[0]);
+        pc = runner.getProcessContext();
+        if (pc instanceof MockProcessContext) {
+            results = ((MockProcessContext) pc).validate();
+        }
+        Assert.assertEquals(1, results.size());
+        for (ValidationResult vr : results) {
+            logger.error(vr.toString());
+            Assert.assertTrue(vr.toString().contains("is invalid because 
unable to create source"));
+        }
+
+        results = new HashSet<>();
+        runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, 
AvroSource.class.getName());
+        runner.enqueue(new byte[0]);
+        pc = runner.getProcessContext();
+        if (pc instanceof MockProcessContext) {
+            results = ((MockProcessContext) pc).validate();
+        }
+        Assert.assertEquals(0, results.size());
+    }
+
+    @Test
+    public void testSequenceSource() {
+        TestRunner runner = 
TestRunners.newTestRunner(FlumeSourceProcessor.class);
+        runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, "seq");
+        runner.run();
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(FlumeSourceProcessor.SUCCESS);
+        Assert.assertEquals(1, flowFiles.size());
+        for (MockFlowFile flowFile : flowFiles) {
+          logger.error(flowFile.toString());
+            Assert.assertEquals(1, flowFile.getSize());
+        }
+    }
+
+    @Test
+    public void testSourceWithConfig() throws IOException {
+        File spoolDirectory = new File("target/spooldir");
+        if (spoolDirectory.exists()) {
+          FileUtils.deleteFilesInDir(spoolDirectory, null, logger);
+        } else {
+          spoolDirectory.mkdirs();
+        }
+        File src = new File("src/test/resources/testdata/records.txt");
+        File dst = new File(spoolDirectory, "records.txt");
+        FileUtils.copyFile(src, dst, false, false, logger);
+
+        TestRunner runner = 
TestRunners.newTestRunner(FlumeSourceProcessor.class);
+        runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, "spooldir");
+        runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG,
+            "tier1.sources.src-1.spoolDir = " + 
spoolDirectory.getAbsolutePath());
+        runner.run();
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(FlumeSourceProcessor.SUCCESS);
+        Assert.assertEquals(1, flowFiles.size());
+        for (MockFlowFile flowFile : flowFiles) {
+            Assert.assertEquals(8, flowFile.getSize());
+            flowFile.assertContentEquals("record 1");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site-broken.xml
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site-broken.xml
 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site-broken.xml
new file mode 100644
index 0000000..e06a193
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site-broken.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+    <property>
+        <name>fs.default.name</name>
+        <value>hdfs://localhost:65535</value>
+    </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site.xml
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site.xml
 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site.xml
new file mode 100644
index 0000000..5e3b55c
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+    <property>
+        <name>fs.defaultFS</name>
+        <value>file:///</value>
+    </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/testdata/records.txt
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/testdata/records.txt
 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/testdata/records.txt
new file mode 100644
index 0000000..5a809ee
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/testdata/records.txt
@@ -0,0 +1,4 @@
+record 1
+record 2
+record 3
+record 4

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml 
b/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml
new file mode 100644
index 0000000..dc9ec69
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml
@@ -0,0 +1,39 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <version>0.0.1-incubating-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-flume-bundle</artifactId>
+    <version>0.0.1-incubating-SNAPSHOT</version>
+    <packaging>pom</packaging>
+    <description>A bundle of processors that run Flume 
sources/sinks</description>
+    <modules>
+        <module>nifi-flume-processors</module>
+        <module>nifi-flume-nar</module>
+    </modules>
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-flume-processors</artifactId>
+                <version>0.0.1-incubating-SNAPSHOT</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/pom.xml b/nifi/nifi-nar-bundles/pom.xml
index b9be570..29a5e49 100644
--- a/nifi/nifi-nar-bundles/pom.xml
+++ b/nifi/nifi-nar-bundles/pom.xml
@@ -41,6 +41,7 @@
         <module>nifi-hl7-bundle</module>
         <module>nifi-language-translation-bundle</module>
         <module>nifi-mongodb-bundle</module>
+        <module>nifi-flume-bundle</module>
     </modules>
     <dependencyManagement>
         <dependencies>

Reply via email to