Added processors that can run Flume sources and Flume sinks. Signed-off-by: Matt Gilman <matt.c.gil...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/b251ab44 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/b251ab44 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/b251ab44 Branch: refs/heads/develop Commit: b251ab44258a840c5bacd973776b26dacbdbbbc2 Parents: 483b3dd Author: Joey Echeverria <j...@cloudera.com> Authored: Wed Jan 28 16:30:49 2015 -0800 Committer: Matt Gilman <matt.c.gil...@gmail.com> Committed: Tue Jul 14 14:50:16 2015 -0400 ---------------------------------------------------------------------- .../nifi-flume-bundle/nifi-flume-nar/pom.xml | 31 ++++ .../nifi-flume-processors/pom.xml | 126 +++++++++++++ .../flume/AbstractFlumeProcessor.java | 134 ++++++++++++++ .../processors/flume/FlumeSinkProcessor.java | 157 ++++++++++++++++ .../processors/flume/FlumeSourceProcessor.java | 178 +++++++++++++++++++ .../nifi/processors/flume/NifiChannel.java | 31 ++++ .../processors/flume/NifiChannelSelector.java | 55 ++++++ .../nifi/processors/flume/NifiTransaction.java | 40 +++++ .../processors/flume/util/FlowFileEvent.java | 114 ++++++++++++ .../flume/util/FlowFileEventConstants.java | 25 +++ .../org.apache.nifi.processor.Processor | 16 ++ .../processors/flume/AbstractFlumeTest.java | 35 ++++ .../flume/FlumeSinkProcessorTest.java | 154 ++++++++++++++++ .../flume/FlumeSourceProcessorTest.java | 140 +++++++++++++++ .../src/test/resources/core-site-broken.xml | 25 +++ .../src/test/resources/core-site.xml | 25 +++ .../src/test/resources/testdata/records.txt | 4 + nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml | 39 ++++ nifi/nifi-nar-bundles/pom.xml | 1 + 19 files changed, 1330 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml new file mode 100644 index 0000000..c5333b6 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml @@ -0,0 +1,31 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-flume-bundle</artifactId> + <version>0.0.1-incubating-SNAPSHOT</version> + </parent> + <artifactId>nifi-flume-nar</artifactId> + <version>0.0.1-incubating-SNAPSHOT</version> + <packaging>nar</packaging> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-flume-processors</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml new file mode 100644 index 0000000..54636ca --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml @@ -0,0 +1,126 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-flume-bundle</artifactId> + <version>0.0.1-incubating-SNAPSHOT</version> + </parent> + <artifactId>nifi-flume-processors</artifactId> + <packaging>jar</packaging> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-flowfile-packager</artifactId> + </dependency> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-sdk</artifactId> + <version>1.5.2</version> + </dependency> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-core</artifactId> + <version>1.5.2</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- Flume Sources --> + <dependency> + <groupId>org.apache.flume.flume-ng-sources</groupId> + <artifactId>flume-twitter-source</artifactId> + <version>1.5.2</version> + </dependency> + <dependency> + <groupId>org.apache.flume.flume-ng-sources</groupId> + <artifactId>flume-jms-source</artifactId> + <version>1.5.2</version> + </dependency> + <dependency> + <groupId>org.apache.flume.flume-ng-sources</groupId> + <artifactId>flume-scribe-source</artifactId> + <version>1.5.2</version> + </dependency> + + <!-- Flume Sinks --> + <dependency> + <groupId>org.apache.flume.flume-ng-sinks</groupId> + <artifactId>flume-hdfs-sink</artifactId> + <version>1.5.2</version> + </dependency> + + <!-- HDFS sink dependencies --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flume.flume-ng-sinks</groupId> + <artifactId>flume-irc-sink</artifactId> + <version>1.5.2</version> + </dependency> + <dependency> + <groupId>org.apache.flume.flume-ng-sinks</groupId> + <artifactId>flume-ng-elasticsearch-sink</artifactId> + <version>1.5.2</version> + </dependency> + <dependency> + <groupId>org.apache.flume.flume-ng-sinks</groupId> + <artifactId>flume-ng-hbase-sink</artifactId> + <version>1.5.2</version> + </dependency> + <dependency> + <groupId>org.apache.flume.flume-ng-sinks</groupId> + <artifactId>flume-ng-morphline-solr-sink</artifactId> + <version>1.5.2</version> + </dependency> + + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java new file mode 100644 index 0000000..5c608d5 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.flume; + +import com.google.common.collect.Maps; +import java.io.IOException; +import java.io.OutputStream; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.SinkFactory; +import org.apache.flume.Source; +import org.apache.flume.SourceFactory; +import org.apache.flume.sink.DefaultSinkFactory; +import org.apache.flume.source.DefaultSourceFactory; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.OutputStreamCallback; +import static org.apache.nifi.processors.flume.FlumeSourceProcessor.FLUME_CONFIG; +import org.apache.nifi.processors.flume.util.FlowFileEvent; + +/** + * This is a base class that is helpful when building processors interacting + * with Flume. + */ +public abstract class AbstractFlumeProcessor extends AbstractProcessor { + protected static final SourceFactory SOURCE_FACTORY = new DefaultSourceFactory(); + protected static final SinkFactory SINK_FACTORY = new DefaultSinkFactory(); + + protected static Event flowFileToEvent(FlowFile flowFile, ProcessSession session) { + return new FlowFileEvent(flowFile, session); + } + + protected static void transferEvent(final Event event, ProcessSession session, + Relationship relationship) { + FlowFile flowFile = session.create(); + flowFile = session.putAllAttributes(flowFile, event.getHeaders()); + + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write(event.getBody()); + } + }); + + session.getProvenanceReporter().create(flowFile); + session.transfer(flowFile, relationship); + } + + protected static Validator createSourceValidator() { + return new Validator() { + @Override + public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + String reason = null; + try { + FlumeSourceProcessor.SOURCE_FACTORY.create("NiFi Source", value); + } catch (Exception ex) { + reason = ex.getLocalizedMessage(); + reason = Character.toLowerCase(reason.charAt(0)) + reason.substring(1); + } + return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build(); + } + }; + } + + protected static Validator createSinkValidator() { + return new Validator() { + @Override + public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + String reason = null; + try { + FlumeSinkProcessor.SINK_FACTORY.create("NiFi Sink", value); + } catch (Exception ex) { + reason = ex.getLocalizedMessage(); + reason = Character.toLowerCase(reason.charAt(0)) + reason.substring(1); + } + return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build(); + } + }; + } + + protected static Context getFlumeContext(String flumeConfig, String prefix) { + Properties flumeProperties = new Properties(); + if (flumeConfig != null) { + try { + flumeProperties.load(new StringReader(flumeConfig)); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + Map<String, String> parameters = Maps.newHashMap(); + for (String property : flumeProperties.stringPropertyNames()) { + parameters.put(property, flumeProperties.getProperty(property)); + } + return new Context(new Context(parameters).getSubProperties(prefix)); + } + + protected static Context getFlumeSourceContext(String flumeConfig, + String agentName, String sourceName) { + return getFlumeContext(flumeConfig, agentName + ".sources." + sourceName + "."); + } + + protected static Context getFlumeSinkContext(String flumeConfig, + String agentName, String sinkName) { + return getFlumeContext(flumeConfig, agentName + ".sinks." + sinkName + "."); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java new file mode 100644 index 0000000..4603d18 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.flume; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import java.util.List; +import java.util.Set; +import org.apache.flume.Context; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.Sink; +import org.apache.flume.Transaction; +import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.conf.Configurables; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.SchedulingContext; +import org.apache.nifi.processor.annotation.CapabilityDescription; +import org.apache.nifi.processor.annotation.OnScheduled; +import org.apache.nifi.processor.annotation.OnUnscheduled; +import org.apache.nifi.processor.annotation.Tags; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.flume.util.FlowFileEvent; + +/** + * This processor runs a Flume sink + */ +@Tags({"flume", "hadoop", "get", "sink" }) +@CapabilityDescription("Generate FlowFile data from a Flume sink") +public class FlumeSinkProcessor extends AbstractFlumeProcessor { + + private Sink sink; + private MemoryChannel channel; + + public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder() + .name("Sink Type") + .description("The fully-qualified name of the Sink class") + .required(true) + .addValidator(createSinkValidator()) + .build(); + public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder() + .name("Agent Name") + .description("The name of the agent used in the Flume sink configuration") + .required(true) + .defaultValue("tier1") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder() + .name("Sink Name") + .description("The name of the sink used in the Flume sink configuration") + .required(true) + .defaultValue("sink-1") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder() + .name("Flume Configuration") + .description("The Flume configuration for the sink copied from the flume.properties file") + .required(true) + .defaultValue("") + .addValidator(Validator.VALID) + .build(); + + public static final Relationship SUCCESS = new Relationship.Builder().name("success").build(); + public static final Relationship FAILURE = new Relationship.Builder().name("failure").build(); + + private List<PropertyDescriptor> descriptors; + private Set<Relationship> relationships; + + + @Override + protected void init(final ProcessorInitializationContext context) { + this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG); + this.relationships = ImmutableSet.of(); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @OnScheduled + public void onScheduled(final SchedulingContext context) { + channel = new MemoryChannel(); + Configurables.configure(channel, new Context()); + channel.start(); + + sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(), + context.getProperty(SINK_TYPE).getValue()); + sink.setChannel(channel); + + String flumeConfig = context.getProperty(FLUME_CONFIG).getValue(); + String agentName = context.getProperty(AGENT_NAME).getValue(); + String sinkName = context.getProperty(SOURCE_NAME).getValue(); + Configurables.configure(sink, + getFlumeSinkContext(flumeConfig, agentName, sinkName) ); + + sink.start(); + } + + @OnUnscheduled + public void unScheduled() { + sink.stop(); + channel.stop(); + } + + @Override + public void onTrigger(final ProcessContext context, + final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + + Transaction transaction = channel.getTransaction(); + try { + transaction.begin(); + channel.put(new FlowFileEvent(flowFile, session)); + transaction.commit(); + } catch (Throwable th) { + transaction.rollback(); + throw Throwables.propagate(th); + } finally { + transaction.close(); + } + + try { + sink.process(); + session.transfer(flowFile, SUCCESS); + } catch (EventDeliveryException ex) { + session.transfer(flowFile, FAILURE); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java new file mode 100644 index 0000000..8b8388c --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.flume; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import java.util.List; +import java.util.Set; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.EventDrivenSource; +import org.apache.flume.PollableSource; +import org.apache.flume.Source; +import org.apache.flume.SourceRunner; +import org.apache.flume.Transaction; +import org.apache.flume.channel.ChannelProcessor; +import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.conf.Configurables; +import org.apache.flume.source.EventDrivenSourceRunner; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.SchedulingContext; +import org.apache.nifi.processor.annotation.CapabilityDescription; +import org.apache.nifi.processor.annotation.OnScheduled; +import org.apache.nifi.processor.annotation.OnUnscheduled; +import org.apache.nifi.processor.annotation.Tags; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +/** + * This processor runs a Flume source + */ +@Tags({"flume", "hadoop", "get", "source" }) +@CapabilityDescription("Generate FlowFile data from a Flume source") +public class FlumeSourceProcessor extends AbstractFlumeProcessor { + + private Source source; + private SourceRunner runner; + private MemoryChannel channel; + + public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder() + .name("Source Type") + .description("The fully-qualified name of the Source class") + .required(true) + .addValidator(createSourceValidator()) + .build(); + public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder() + .name("Agent Name") + .description("The name of the agent used in the Flume source configuration") + .required(true) + .defaultValue("tier1") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder() + .name("Source Name") + .description("The name of the source used in the Flume source configuration") + .required(true) + .defaultValue("src-1") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder() + .name("Flume Configuration") + .description("The Flume configuration for the source copied from the flume.properties file") + .required(true) + .defaultValue("") + .addValidator(Validator.VALID) + .build(); + + public static final Relationship SUCCESS = new Relationship.Builder().name("success").build(); + + private List<PropertyDescriptor> descriptors; + private Set<Relationship> relationships; + + + @Override + protected void init(final ProcessorInitializationContext context) { + this.descriptors = ImmutableList.of(SOURCE_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG); + this.relationships = ImmutableSet.of(SUCCESS); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @OnScheduled + public void onScheduled(final SchedulingContext context) { + source = SOURCE_FACTORY.create( + context.getProperty(SOURCE_NAME).getValue(), + context.getProperty(SOURCE_TYPE).getValue()); + + String flumeConfig = context.getProperty(FLUME_CONFIG).getValue(); + String agentName = context.getProperty(AGENT_NAME).getValue(); + String sourceName = context.getProperty(SOURCE_NAME).getValue(); + Configurables.configure(source, + getFlumeSourceContext(flumeConfig, agentName, sourceName) ); + + if (source instanceof EventDrivenSource) { + runner = new EventDrivenSourceRunner(); + channel = new MemoryChannel(); + Configurables.configure(channel, new Context()); + channel.start(); + source.setChannelProcessor(new ChannelProcessor(new NifiChannelSelector(channel))); + runner.setSource(source); + runner.start(); + } + } + + @OnUnscheduled + public void unScheduled() { + if (runner != null) { + runner.stop(); + } + if (channel != null) { + channel.stop(); + } + } + + @Override + public void onTrigger(final ProcessContext context, + final ProcessSession session) throws ProcessException { + if (source instanceof EventDrivenSource) { + onEventDrivenTrigger(context, session); + } else if (source instanceof PollableSource) { + onPollableTrigger((PollableSource)source, context, session); + } + } + + public void onPollableTrigger(final PollableSource pollableSource, + final ProcessContext context, final ProcessSession session) + throws ProcessException { + try { + pollableSource.setChannelProcessor(new ChannelProcessor( + new NifiChannelSelector(new NifiChannel(session, SUCCESS)))); + pollableSource.start(); + pollableSource.process(); + pollableSource.stop(); + } catch (EventDeliveryException ex) { + throw new ProcessException("Error processing pollable source", ex); + } + } + + public void onEventDrivenTrigger(final ProcessContext context, final ProcessSession session) { + Transaction transaction = channel.getTransaction(); + transaction.begin(); + + Event event = channel.take(); + if (event != null) { + transferEvent(event, session, SUCCESS); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannel.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannel.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannel.java new file mode 100644 index 0000000..ac8dbe2 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannel.java @@ -0,0 +1,31 @@ + +package org.apache.nifi.processors.flume; + +import org.apache.flume.Context; +import org.apache.flume.channel.BasicChannelSemantics; +import org.apache.flume.channel.BasicTransactionSemantics; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; + + +public class NifiChannel extends BasicChannelSemantics { + private final ProcessSession session; + private final Relationship relationship; + + public NifiChannel(ProcessSession session, Relationship relationship) { + this.session = session; + this.relationship = relationship; + } + + @Override + protected BasicTransactionSemantics createTransaction() { + return new NifiTransaction(session, relationship); + } + + @Override + public void configure(Context context) { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannelSelector.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannelSelector.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannelSelector.java new file mode 100644 index 0000000..792678b --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannelSelector.java @@ -0,0 +1,55 @@ + +package org.apache.nifi.processors.flume; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import org.apache.flume.Channel; +import org.apache.flume.ChannelSelector; +import org.apache.flume.Context; +import org.apache.flume.Event; + + +public class NifiChannelSelector implements ChannelSelector { + private String name; + private final List<Channel> requiredChannels; + private final List<Channel> optionalChannels; + + public NifiChannelSelector(Channel channel) { + requiredChannels = ImmutableList.of(channel); + optionalChannels = ImmutableList.of(); + } + + @Override + public List<Channel> getRequiredChannels(Event event) { + return requiredChannels; + } + + @Override + public List<Channel> getOptionalChannels(Event event) { + return optionalChannels; + } + + @Override + public List<Channel> getAllChannels() { + return requiredChannels; + } + + @Override + public void setChannels(List<Channel> channels) { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + + @Override + public String getName() { + return name; + } + + @Override + public void setName(String name) { + this.name = name; + } + + @Override + public void configure(Context context) { + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiTransaction.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiTransaction.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiTransaction.java new file mode 100644 index 0000000..3d6a647 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiTransaction.java @@ -0,0 +1,40 @@ + +package org.apache.nifi.processors.flume; + +import org.apache.flume.Event; +import org.apache.flume.channel.BasicTransactionSemantics; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; + + +class NifiTransaction extends BasicTransactionSemantics { + private final ProcessSession session; + private final Relationship relationship; + + public NifiTransaction(ProcessSession session, Relationship relationship) { + this.session = session; + this.relationship = relationship; + } + + @Override + protected void doPut(Event event) throws InterruptedException { + AbstractFlumeProcessor.transferEvent(event, session, relationship); + } + + @Override + protected Event doTake() throws InterruptedException { + throw new UnsupportedOperationException("Only put supported"); + } + + @Override + protected void doCommit() throws InterruptedException { + session.commit(); + } + + @Override + protected void doRollback() throws InterruptedException { + session.rollback(); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java new file mode 100644 index 0000000..c3531ca --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java @@ -0,0 +1,114 @@ + +package org.apache.nifi.processors.flume.util; + +import com.google.common.collect.Maps; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Map; +import org.apache.flume.Event; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.io.InputStreamCallback; + +import static org.apache.nifi.processors.flume.util.FlowFileEventConstants.*; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.stream.io.StreamUtils; + +public class FlowFileEvent implements Event { + + private final FlowFile flowFile; + private final ProcessSession session; + + private final Map<String, String> headers; + private boolean headersLoaded; + + private final Object bodyLock; + private byte[] body; + private boolean bodyLoaded; + + public FlowFileEvent(FlowFile flowFile, ProcessSession session) { + this.flowFile = flowFile; + this.session = session; + + headers = Maps.newHashMap(); + bodyLock = new Object(); + bodyLoaded = false; + } + + @Override + public Map<String, String> getHeaders() { + if (!headersLoaded) { + synchronized (headers) { + if (headersLoaded) { + return headers; + } + + headers.putAll(flowFile.getAttributes()); + headers.put(ENTRY_DATE_HEADER, Long.toString(flowFile.getEntryDate())); + headers.put(ID_HEADER, Long.toString(flowFile.getId())); + headers.put(LAST_QUEUE_DATE_HEADER, Long.toString(flowFile.getLastQueueDate())); + int i = 0; + for (String lineageIdentifier : flowFile.getLineageIdentifiers()) { + headers.put(LINEAGE_IDENTIFIERS_HEADER + "." + i, lineageIdentifier); + i++; + } + headers.put(LINEAGE_START_DATE_HEADER, Long.toString(flowFile.getLineageStartDate())); + headers.put(SIZE_HEADER, Long.toString(flowFile.getSize())); + + headersLoaded = true; + } + } + return headers; + } + + @Override + public void setHeaders(Map<String, String> headers) { + synchronized (this.headers) { + this.headers.clear(); + this.headers.putAll(headers); + headersLoaded = true; + } + } + + @Override + public byte[] getBody() { + if (bodyLoaded) { + return body; + } + + synchronized (bodyLock ) { + if (!bodyLoaded) { + if (flowFile.getSize() > Integer.MAX_VALUE) { + throw new RuntimeException("Can't get body of Event because the backing FlowFile is too large (" + flowFile.getSize() + " bytes)"); + } + + final ByteArrayOutputStream baos = new ByteArrayOutputStream((int) flowFile.getSize()); + session.read(flowFile, new InputStreamCallback() { + + @Override + public void process(InputStream in) throws IOException { + try (BufferedInputStream input = new BufferedInputStream(in)) { + StreamUtils.copy(in, baos); + } + baos.close(); + } + }); + + body = baos.toByteArray(); + bodyLoaded = true; + } + } + + return body; + } + + @Override + public void setBody(byte[] body) { + synchronized (bodyLock) { + this.body = Arrays.copyOf(body, body.length); + bodyLoaded = true; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEventConstants.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEventConstants.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEventConstants.java new file mode 100644 index 0000000..c13f0ef --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEventConstants.java @@ -0,0 +1,25 @@ + +package org.apache.nifi.processors.flume.util; + + +public class FlowFileEventConstants { + + // FlowFile#getEntryDate(); + public static final String ENTRY_DATE_HEADER = "nifi.entry.date"; + + // FlowFile#getId(); + public static final String ID_HEADER = "nifi.id"; + + // FlowFile#getLastQueueDate(); + public static final String LAST_QUEUE_DATE_HEADER = "nifi.last.queue.date"; + + // FlowFile#getLineageIdentifiers(); + public static final String LINEAGE_IDENTIFIERS_HEADER = "nifi.lineage.identifiers"; + + // FlowFile#getLineageStartDate(); + public static final String LINEAGE_START_DATE_HEADER = "nifi.lineage.start.date"; + + // FlowFile#getSize(); + public static final String SIZE_HEADER = "nifi.size"; + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..fae8727 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.flume.FlumeSourceProcessor +org.apache.nifi.processors.flume.FlumeSinkProcessor http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/AbstractFlumeTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/AbstractFlumeTest.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/AbstractFlumeTest.java new file mode 100644 index 0000000..87b056a --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/AbstractFlumeTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.flume; + +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AbstractFlumeTest { + + private static Logger logger; + + @BeforeClass + public static void setUpClass() { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.flume", "debug"); + logger = LoggerFactory.getLogger(AbstractFlumeTest.class); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java new file mode 100644 index 0000000..4f2cef7 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.flume; + +import java.io.File; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.FileInputStream; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import org.apache.commons.io.filefilter.HiddenFileFilter; +import org.apache.flume.sink.NullSink; +import org.apache.flume.source.AvroSource; + +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.MockProcessContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.util.file.FileUtils; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlumeSinkProcessorTest { + + private static final Logger logger = + LoggerFactory.getLogger(FlumeSinkProcessorTest.class); + + @Test + public void testValidators() { + TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class); + Collection<ValidationResult> results; + ProcessContext pc; + + results = new HashSet<>(); + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(1, results.size()); + for (ValidationResult vr : results) { + logger.error(vr.toString()); + Assert.assertTrue(vr.toString().contains("is invalid because Sink Type is required")); + } + + // non-existent class + results = new HashSet<>(); + runner.setProperty(FlumeSinkProcessor.SINK_TYPE, "invalid.class.name"); + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(1, results.size()); + for (ValidationResult vr : results) { + logger.error(vr.toString()); + Assert.assertTrue(vr.toString().contains("is invalid because unable to load sink")); + } + + // class doesn't implement Sink + results = new HashSet<>(); + runner.setProperty(FlumeSinkProcessor.SINK_TYPE, AvroSource.class.getName()); + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(1, results.size()); + for (ValidationResult vr : results) { + logger.error(vr.toString()); + Assert.assertTrue(vr.toString().contains("is invalid because unable to create sink")); + } + + results = new HashSet<>(); + runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName()); + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(0, results.size()); + } + + + @Test + public void testNullSink() throws IOException { + TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class); + runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName()); + FileInputStream fis = new FileInputStream("src/test/resources/testdata/records.txt"); + Map<String, String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "records.txt"); + runner.enqueue(fis, attributes); + runner.run(); + fis.close(); + } + + @Test + public void testHdfsSink() throws IOException { + File destDir = new File("target/hdfs"); + if (destDir.exists()) { + FileUtils.deleteFilesInDir(destDir, null, logger); + } else { + destDir.mkdirs(); + } + + TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class); + runner.setProperty(FlumeSinkProcessor.SINK_TYPE, "hdfs"); + runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG, + "tier1.sinks.sink-1.hdfs.path = " + destDir.toURI().toString() + "\n" + + "tier1.sinks.sink-1.hdfs.fileType = DataStream\n" + + "tier1.sinks.sink-1.hdfs.serializer = TEXT\n" + + "tier1.sinks.sink-1.serializer.appendNewline = false" + ); + FileInputStream fis = new FileInputStream("src/test/resources/testdata/records.txt"); + Map<String, String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "records.txt"); + runner.enqueue(fis, attributes); + runner.run(); + fis.close(); + + File[] files = destDir.listFiles((FilenameFilter)HiddenFileFilter.VISIBLE); + assertEquals("Unexpected number of destination files.", 1, files.length); + File dst = files[0]; + byte[] expectedMd5 = FileUtils.computeMd5Digest(new File("src/test/resources/testdata/records.txt")); + byte[] actualMd5 = FileUtils.computeMd5Digest(dst); + Assert.assertArrayEquals("Destination file doesn't match source data", expectedMd5, actualMd5); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java new file mode 100644 index 0000000..bbcf116 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.flume; + + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import org.apache.flume.sink.NullSink; +import org.apache.flume.source.AvroSource; + +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.MockProcessContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.util.file.FileUtils; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlumeSourceProcessorTest { + + private static final Logger logger = + LoggerFactory.getLogger(FlumeSourceProcessorTest.class); + + + @Test + public void testValidators() { + TestRunner runner = TestRunners.newTestRunner(FlumeSourceProcessor.class); + Collection<ValidationResult> results; + ProcessContext pc; + + results = new HashSet<>(); + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(1, results.size()); + for (ValidationResult vr : results) { + logger.error(vr.toString()); + Assert.assertTrue(vr.toString().contains("is invalid because Source Type is required")); + } + + // non-existent class + results = new HashSet<>(); + runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, "invalid.class.name"); + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(1, results.size()); + for (ValidationResult vr : results) { + logger.error(vr.toString()); + Assert.assertTrue(vr.toString().contains("is invalid because unable to load source")); + } + + // class doesn't implement Source + results = new HashSet<>(); + runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, NullSink.class.getName()); + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(1, results.size()); + for (ValidationResult vr : results) { + logger.error(vr.toString()); + Assert.assertTrue(vr.toString().contains("is invalid because unable to create source")); + } + + results = new HashSet<>(); + runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, AvroSource.class.getName()); + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(0, results.size()); + } + + @Test + public void testSequenceSource() { + TestRunner runner = TestRunners.newTestRunner(FlumeSourceProcessor.class); + runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, "seq"); + runner.run(); + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FlumeSourceProcessor.SUCCESS); + Assert.assertEquals(1, flowFiles.size()); + for (MockFlowFile flowFile : flowFiles) { + logger.error(flowFile.toString()); + Assert.assertEquals(1, flowFile.getSize()); + } + } + + @Test + public void testSourceWithConfig() throws IOException { + File spoolDirectory = new File("target/spooldir"); + if (spoolDirectory.exists()) { + FileUtils.deleteFilesInDir(spoolDirectory, null, logger); + } else { + spoolDirectory.mkdirs(); + } + File src = new File("src/test/resources/testdata/records.txt"); + File dst = new File(spoolDirectory, "records.txt"); + FileUtils.copyFile(src, dst, false, false, logger); + + TestRunner runner = TestRunners.newTestRunner(FlumeSourceProcessor.class); + runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, "spooldir"); + runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG, + "tier1.sources.src-1.spoolDir = " + spoolDirectory.getAbsolutePath()); + runner.run(); + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FlumeSourceProcessor.SUCCESS); + Assert.assertEquals(1, flowFiles.size()); + for (MockFlowFile flowFile : flowFiles) { + Assert.assertEquals(8, flowFile.getSize()); + flowFile.assertContentEquals("record 1"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site-broken.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site-broken.xml b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site-broken.xml new file mode 100644 index 0000000..e06a193 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site-broken.xml @@ -0,0 +1,25 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> + +<!-- Put site-specific property overrides in this file. --> + +<configuration> + <property> + <name>fs.default.name</name> + <value>hdfs://localhost:65535</value> + </property> +</configuration> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site.xml b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site.xml new file mode 100644 index 0000000..5e3b55c --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site.xml @@ -0,0 +1,25 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> + +<!-- Put site-specific property overrides in this file. --> + +<configuration> + <property> + <name>fs.defaultFS</name> + <value>file:///</value> + </property> +</configuration> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/testdata/records.txt ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/testdata/records.txt b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/testdata/records.txt new file mode 100644 index 0000000..5a809ee --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/testdata/records.txt @@ -0,0 +1,4 @@ +record 1 +record 2 +record 3 +record 4 http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml b/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml new file mode 100644 index 0000000..dc9ec69 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml @@ -0,0 +1,39 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-bundles</artifactId> + <version>0.0.1-incubating-SNAPSHOT</version> + </parent> + <artifactId>nifi-flume-bundle</artifactId> + <version>0.0.1-incubating-SNAPSHOT</version> + <packaging>pom</packaging> + <description>A bundle of processors that run Flume sources/sinks</description> + <modules> + <module>nifi-flume-processors</module> + <module>nifi-flume-nar</module> + </modules> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-flume-processors</artifactId> + <version>0.0.1-incubating-SNAPSHOT</version> + </dependency> + </dependencies> + </dependencyManagement> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b251ab44/nifi/nifi-nar-bundles/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/pom.xml b/nifi/nifi-nar-bundles/pom.xml index b9be570..29a5e49 100644 --- a/nifi/nifi-nar-bundles/pom.xml +++ b/nifi/nifi-nar-bundles/pom.xml @@ -41,6 +41,7 @@ <module>nifi-hl7-bundle</module> <module>nifi-language-translation-bundle</module> <module>nifi-mongodb-bundle</module> + <module>nifi-flume-bundle</module> </modules> <dependencyManagement> <dependencies>