NIFI-238: Add Kite processors.

Includes:
* KiteStorageProcessor - store Avro files in a Kite dataset
* KiteCSVToAvroProcessor - convert CSV to Avro for storage
* KiteJSONToAvroProcessor - convert JSON to Avro for storage


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/82424173
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/82424173
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/82424173

Branch: refs/heads/batch-stream
Commit: 8242417388bdfcd63cc80214b1dd1dd9c29382f5
Parents: b8ade5b
Author: Ryan Blue <b...@apache.org>
Authored: Thu Feb 12 18:53:49 2015 -0800
Committer: Ryan Blue <b...@apache.org>
Committed: Thu Feb 19 15:34:09 2015 -0800

----------------------------------------------------------------------
 .../nifi-kite-bundle/nifi-kite-nar/pom.xml      |  35 +++
 .../nifi-kite-processors/pom.xml                | 173 +++++++++++++
 .../processors/kite/AbstractKiteProcessor.java  | 217 ++++++++++++++++
 .../apache/nifi/processors/kite/AvroUtil.java   |  43 ++++
 .../nifi/processors/kite/ConvertCSVToAvro.java  | 258 +++++++++++++++++++
 .../nifi/processors/kite/ConvertJSONToAvro.java | 157 +++++++++++
 .../nifi/processors/kite/JSONFileReader.java    | 114 ++++++++
 .../processors/kite/StoreInKiteDataset.java     | 168 ++++++++++++
 .../data/spi/filesystem/CSVFileReaderFixed.java | 172 +++++++++++++
 .../org.apache.nifi.processor.Processor         |  17 ++
 .../processors/kite/TestCSVToAvroProcessor.java | 126 +++++++++
 .../kite/TestConfigurationProperty.java         |  77 ++++++
 .../nifi/processors/kite/TestGetSchema.java     |  94 +++++++
 .../kite/TestJSONToAvroProcessor.java           |  61 +++++
 .../kite/TestKiteProcessorsCluster.java         | 128 +++++++++
 .../kite/TestKiteStorageProcessor.java          | 167 ++++++++++++
 .../apache/nifi/processors/kite/TestUtil.java   | 103 ++++++++
 nifi/nifi-nar-bundles/nifi-kite-bundle/pom.xml  |  59 +++++
 18 files changed, 2169 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-nar/pom.xml 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-nar/pom.xml
new file mode 100644
index 0000000..dab7cb9
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-nar/pom.xml
@@ -0,0 +1,35 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+        http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+  -->
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.nifi</groupId>
+    <artifactId>nifi-kite-bundle</artifactId>
+    <version>0.0.2-incubating-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>nifi-kite-nar</artifactId>
+  <packaging>nar</packaging>
+
+  <name>Kite NAR</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.nifi</groupId>
+      <artifactId>nifi-kite-processors</artifactId>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml
new file mode 100644
index 0000000..311b3bf
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml
@@ -0,0 +1,173 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+        http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+  -->
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.nifi</groupId>
+    <artifactId>nifi-kite-bundle</artifactId>
+    <version>0.0.2-incubating-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>nifi-kite-processors</artifactId>
+  <packaging>jar</packaging>
+  <name>Kite Hadoop Processors</name>
+
+  <properties>
+    <kite.version>0.18.0</kite.version>
+    <guava.version>11.0.2</guava.version>
+    <junit.version>4.10</junit.version>
+    <findbugs-annotations.version>1.3.9-1</findbugs-annotations.version>
+  </properties>
+
+  <dependencies>
+    <!-- NiFi -->
+
+    <dependency>
+      <groupId>org.apache.nifi</groupId>
+      <artifactId>nifi-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.nifi</groupId>
+      <artifactId>nifi-utils</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.nifi</groupId>
+      <artifactId>nifi-processor-utils</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.nifi</groupId>
+      <artifactId>nifi-flowfile-packager</artifactId>
+    </dependency>
+
+    <!-- Kite -->
+
+    <dependency>
+      <groupId>org.kitesdk</groupId>
+      <artifactId>kite-data-core</artifactId>
+      <version>${kite.version}</version>
+      <exclusions>
+        <exclusion>
+          <!-- Use findbugs-annotations instead -->
+          <groupId>com.google.code.findbugs</groupId>
+          <artifactId>jsr305</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.kitesdk</groupId>
+      <artifactId>kite-data-hive</artifactId>
+      <version>${kite.version}</version>
+      <exclusions>
+        <exclusion>
+          <!-- Use findbugs-annotations instead -->
+          <groupId>com.google.code.findbugs</groupId>
+          <artifactId>jsr305</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.kitesdk</groupId>
+      <artifactId>kite-data-hbase</artifactId>
+      <version>${kite.version}</version>
+      <exclusions>
+        <exclusion>
+          <!-- Use findbugs-annotations instead -->
+          <groupId>com.google.code.findbugs</groupId>
+          <artifactId>jsr305</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.kitesdk</groupId>
+      <artifactId>kite-hadoop-dependencies</artifactId>
+      <type>pom</type>
+      <version>${kite.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.kitesdk</groupId>
+      <artifactId>kite-hbase-dependencies</artifactId>
+      <type>pom</type>
+      <scope>provided</scope>
+      <optional>true</optional>
+      <version>${kite.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${guava.version}</version>
+      <scope>compile</scope>
+    </dependency>
+
+    <dependency>
+      <!-- avoid warnings by bundling annotations -->
+      <groupId>com.github.stephenc.findbugs</groupId>
+      <artifactId>findbugs-annotations</artifactId>
+      <scope>compile</scope>
+      <version>${findbugs-annotations.version}</version>
+    </dependency>
+
+    <!-- Test dependencies -->
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+      <version>${junit.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.nifi</groupId>
+      <artifactId>nifi-mock</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.kitesdk</groupId>
+      <artifactId>kite-minicluster</artifactId>
+      <version>${kite.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-servlet</artifactId>
+      <version>1.14</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.kitesdk</groupId>
+      <artifactId>kite-data-core</artifactId>
+      <version>${kite.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.kitesdk</groupId>
+      <artifactId>kite-hadoop-test-dependencies</artifactId>
+      <type>pom</type>
+      <scope>test</scope>
+      <version>${kite.version}</version>
+    </dependency>
+
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java
new file mode 100644
index 0000000..c510489
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.kite;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Resources;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.kitesdk.data.DatasetNotFoundException;
+import org.kitesdk.data.Datasets;
+import org.kitesdk.data.SchemaNotFoundException;
+import org.kitesdk.data.URIBuilder;
+import org.kitesdk.data.spi.DefaultConfiguration;
+
+abstract class AbstractKiteProcessor extends AbstractProcessor {
+
+  private static final Splitter COMMA = Splitter.on(',').trimResults();
+  protected static final Validator FILES_EXIST = new Validator() {
+    @Override
+    public ValidationResult validate(String subject, String configFiles,
+                                     ValidationContext context) {
+      if (configFiles != null && !configFiles.isEmpty()) {
+        for (String file : COMMA.split(configFiles)) {
+          ValidationResult result = StandardValidators.FILE_EXISTS_VALIDATOR
+              .validate(subject, file, context);
+          if (!result.isValid()) {
+            return result;
+          }
+        }
+      }
+      return new ValidationResult.Builder()
+          .subject(subject)
+          .input(configFiles)
+          .explanation("Files exist")
+          .valid(true)
+          .build();
+    }
+  };
+
+  protected static final PropertyDescriptor CONF_XML_FILES =
+      new PropertyDescriptor.Builder()
+          .name("Hadoop configuration files")
+          .description("A comma-separated list of Hadoop configuration files")
+          .addValidator(FILES_EXIST)
+          .build();
+
+  protected static final Validator RECOGNIZED_URI = new Validator() {
+    @Override
+    public ValidationResult validate(String subject, String uri,
+                                     ValidationContext context) {
+      String message = "not set";
+      boolean isValid = true;
+      if (uri == null || uri.isEmpty()) {
+        isValid = false;
+      } else {
+        try {
+          new URIBuilder(URI.create(uri)).build();
+        } catch (RuntimeException e) {
+          message = e.getMessage();
+          isValid = false;
+        }
+      }
+      return new ValidationResult.Builder()
+          .subject(subject)
+          .input(uri)
+          .explanation("Dataset URI is invalid: " + message)
+          .valid(isValid)
+          .build();
+    }
+  };
+
+  /**
+   * Resolves a {@link Schema} for the given string, either a URI or a JSON
+   * literal.
+   */
+  protected static Schema getSchema(String uriOrLiteral, Configuration conf) {
+    URI uri;
+    try {
+      uri = new URI(uriOrLiteral);
+    } catch (URISyntaxException e) {
+      // try to parse the schema as a literal
+      return parseSchema(uriOrLiteral);
+    }
+
+    try {
+      if ("dataset".equals(uri.getScheme()) || "view".equals(uri.getScheme())) 
{
+        return Datasets.load(uri).getDataset().getDescriptor().getSchema();
+      } else if ("resource".equals(uri.getScheme())) {
+        InputStream in = Resources.getResource(uri.getSchemeSpecificPart())
+            .openStream();
+        return parseSchema(uri, in);
+      } else {
+        // try to open the file
+        Path schemaPath = new Path(uri);
+        FileSystem fs = schemaPath.getFileSystem(conf);
+        return parseSchema(uri, fs.open(schemaPath));
+      }
+
+    } catch (DatasetNotFoundException e) {
+      throw new SchemaNotFoundException(
+          "Cannot read schema of missing dataset: " + uri, e);
+    } catch (IOException e) {
+      throw new SchemaNotFoundException(
+          "Failed while reading " + uri + ": " + e.getMessage(), e);
+    }
+  }
+
+  private static Schema parseSchema(String literal) {
+    try {
+      return new Schema.Parser().parse(literal);
+    } catch (RuntimeException e) {
+      throw new SchemaNotFoundException(
+          "Failed to parse schema: " + literal, e);
+    }
+  }
+
+  private static Schema parseSchema(URI uri, InputStream in) throws 
IOException {
+    try {
+      return new Schema.Parser().parse(in);
+    } catch (RuntimeException e) {
+      throw new SchemaNotFoundException("Failed to parse schema at " + uri, e);
+    }
+  }
+
+  protected static final Validator SCHEMA_VALIDATOR = new Validator() {
+    @Override
+    public ValidationResult validate(String subject, String uri, 
ValidationContext context) {
+      Configuration conf = getConfiguration(
+          context.getProperty(CONF_XML_FILES).getValue());
+
+      String error = null;
+      try {
+        getSchema(uri, conf);
+      } catch (SchemaNotFoundException e) {
+        error = e.getMessage();
+      }
+      return new ValidationResult.Builder()
+          .subject(subject)
+          .input(uri)
+          .explanation(error)
+          .valid(error == null)
+          .build();
+    }
+  };
+
+  protected static final List<PropertyDescriptor> ABSTRACT_KITE_PROPS =
+      ImmutableList.<PropertyDescriptor>builder()
+          .add(CONF_XML_FILES)
+          .build();
+
+  static List<PropertyDescriptor> getProperties() {
+    return ABSTRACT_KITE_PROPS;
+  }
+
+  @OnScheduled
+  protected void setDefaultConfiguration(ProcessContext context)
+      throws IOException {
+    DefaultConfiguration.set(getConfiguration(
+        context.getProperty(CONF_XML_FILES).getValue()));
+  }
+
+  protected static Configuration getConfiguration(String configFiles) {
+    Configuration conf = DefaultConfiguration.get();
+
+    if (configFiles == null || configFiles.isEmpty()) {
+      return conf;
+    }
+
+    for (String file : COMMA.split(configFiles)) {
+      // process each resource only once
+      if (conf.getResource(file) == null) {
+        // use Path instead of String to get the file from the FS
+        conf.addResource(new Path(file));
+      }
+    }
+
+    return conf;
+  }
+
+  @Override
+  protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+    return ABSTRACT_KITE_PROPS;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroUtil.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroUtil.java
new file mode 100644
index 0000000..0d0b04d
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroUtil.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.kite;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+
+import static org.apache.avro.generic.GenericData.StringType;
+
+class AvroUtil {
+
+  @SuppressWarnings("unchecked")
+  public static <D> DatumWriter<D> newDatumWriter(Schema schema, Class<D> 
dClass) {
+    return (DatumWriter<D>) GenericData.get().createDatumWriter(schema);
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <D> DatumReader<D> newDatumReader(Schema schema, Class<D> 
dClass) {
+    return (DatumReader<D>) GenericData.get().createDatumReader(schema);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
new file mode 100644
index 0000000..18240c3
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.kite;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.Set;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.kitesdk.data.DatasetException;
+import org.kitesdk.data.DatasetIOException;
+import org.kitesdk.data.DatasetRecordException;
+import org.kitesdk.data.spi.DefaultConfiguration;
+import org.kitesdk.data.spi.filesystem.CSVFileReaderFixed;
+import org.kitesdk.data.spi.filesystem.CSVProperties;
+
+import static 
org.apache.nifi.processor.util.StandardValidators.createLongValidator;
+
+@Tags({"kite", "csv", "avro"})
+@CapabilityDescription(
+    "Converts CSV files to Avro according to an Avro Schema")
+public class ConvertCSVToAvro extends AbstractKiteProcessor {
+  private static CSVProperties DEFAULTS = new CSVProperties.Builder().build();
+
+  private static Validator CHAR_VALIDATOR = new Validator() {
+    @Override
+    public ValidationResult validate(String subject, String input,
+                                     ValidationContext context) {
+      return new ValidationResult.Builder()
+          .subject(subject)
+          .input(input)
+          .explanation("Only single characters are supported")
+          .valid(input.length() == 1)
+          .build();
+    }
+  };
+
+  private static Relationship SUCCESS = new Relationship.Builder()
+      .name("success")
+      .description("FlowFile content has been successfully saved")
+      .build();
+
+  private static Relationship FAILURE = new Relationship.Builder()
+      .name("failure")
+      .description("FlowFile content could not be processed")
+      .build();
+
+  @VisibleForTesting
+  static final PropertyDescriptor SCHEMA =
+      new PropertyDescriptor.Builder()
+          .name("Record schema")
+          .description(
+              "Outgoing Avro schema for each record created from a CSV row")
+          .addValidator(SCHEMA_VALIDATOR)
+          .required(true)
+          .build();
+
+  @VisibleForTesting
+  static final PropertyDescriptor CHARSET =
+      new PropertyDescriptor.Builder()
+          .name("CSV charset")
+          .description("Character set for CSV files")
+          .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+          .defaultValue(DEFAULTS.charset)
+          .build();
+
+  @VisibleForTesting
+  static final PropertyDescriptor DELIMITER =
+      new PropertyDescriptor.Builder()
+          .name("CSV delimiter")
+          .description("Delimiter character for CSV records")
+          .addValidator(CHAR_VALIDATOR)
+          .defaultValue(DEFAULTS.delimiter)
+          .build();
+
+  @VisibleForTesting
+  static final PropertyDescriptor QUOTE =
+      new PropertyDescriptor.Builder()
+          .name("CSV quote character")
+          .description("Quote character for CSV values")
+          .addValidator(CHAR_VALIDATOR)
+          .defaultValue(DEFAULTS.quote)
+          .build();
+
+  @VisibleForTesting
+  static final PropertyDescriptor ESCAPE =
+      new PropertyDescriptor.Builder()
+          .name("CSV escape character")
+          .description("Escape character for CSV values")
+          .addValidator(CHAR_VALIDATOR)
+          .defaultValue(DEFAULTS.escape)
+          .build();
+
+  @VisibleForTesting
+  static final PropertyDescriptor HAS_HEADER =
+      new PropertyDescriptor.Builder()
+          .name("Use CSV header line")
+          .description("Whether to use the first line as a header")
+          .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+          .defaultValue(String.valueOf(DEFAULTS.useHeader))
+          .build();
+
+  @VisibleForTesting
+  static final PropertyDescriptor LINES_TO_SKIP =
+      new PropertyDescriptor.Builder()
+          .name("Lines to skip")
+          .description("Number of lines to skip before reading header or data")
+          .addValidator(createLongValidator(0L, Integer.MAX_VALUE, true))
+          .defaultValue(String.valueOf(DEFAULTS.linesToSkip))
+          .build();
+
+  private static final List<PropertyDescriptor> PROPERTIES =
+      ImmutableList.<PropertyDescriptor>builder()
+          .addAll(AbstractKiteProcessor.getProperties())
+          .add(SCHEMA)
+          .add(CHARSET)
+          .add(DELIMITER)
+          .add(QUOTE)
+          .add(ESCAPE)
+          .add(HAS_HEADER)
+          .add(LINES_TO_SKIP)
+          .build();
+
+  private static final Set<Relationship> RELATIONSHIPS =
+      ImmutableSet.<Relationship>builder()
+          .add(SUCCESS)
+          .add(FAILURE)
+          .build();
+
+  // Immutable configuration
+  @VisibleForTesting
+  volatile CSVProperties props;
+
+  @Override
+  protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+    return PROPERTIES;
+  }
+
+  @Override
+  public Set<Relationship> getRelationships() {
+    return RELATIONSHIPS;
+  }
+
+  @OnScheduled
+  public void createCSVProperties(ProcessContext context) throws IOException {
+    super.setDefaultConfiguration(context);
+
+    this.props = new CSVProperties.Builder()
+        .charset(context.getProperty(CHARSET).getValue())
+        .delimiter(context.getProperty(DELIMITER).getValue())
+        .quote(context.getProperty(QUOTE).getValue())
+        .escape(context.getProperty(ESCAPE).getValue())
+        .hasHeader(context.getProperty(HAS_HEADER).asBoolean())
+        .linesToSkip(context.getProperty(LINES_TO_SKIP).asInteger())
+        .build();
+  }
+
+  @Override
+  public void onTrigger(ProcessContext context, final ProcessSession session)
+      throws ProcessException {
+    FlowFile flowFile = session.get();
+    if (flowFile == null) {
+      return;
+    }
+
+    final Schema schema = getSchema(
+        context.getProperty(SCHEMA).getValue(),
+        DefaultConfiguration.get());
+
+    final DataFileWriter<Record> writer = new DataFileWriter<>(
+        AvroUtil.newDatumWriter(schema, Record.class));
+    writer.setCodec(CodecFactory.snappyCodec());
+
+    try {
+      flowFile = session.write(flowFile, new StreamCallback() {
+        @Override
+        public void process(InputStream in, OutputStream out) throws 
IOException {
+          long written = 0L;
+          long errors = 0L;
+          try (CSVFileReaderFixed<Record> reader = new CSVFileReaderFixed<>(
+              in, props, schema, Record.class)) {
+            reader.initialize();
+            try (DataFileWriter<Record> w = writer.create(schema, out)) {
+              while (reader.hasNext()) {
+                try {
+                  Record record = reader.next();
+                  w.append(record);
+                  written += 1;
+                } catch (DatasetRecordException e) {
+                  errors += 1;
+                }
+              }
+            }
+          }
+          session.adjustCounter("Converted records", written,
+              false /* update only if file transfer is successful */);
+          session.adjustCounter("Conversion errors", errors,
+              false /* update only if file transfer is successful */);
+        }
+      });
+
+      session.transfer(flowFile, SUCCESS);
+
+      //session.getProvenanceReporter().send(flowFile, 
target.getUri().toString());
+    } catch (ProcessException | DatasetIOException e) {
+      getLogger().error("Failed reading or writing", e);
+      session.transfer(flowFile, FAILURE);
+
+    } catch (DatasetException e) {
+      getLogger().error("Failed to read FlowFile", e);
+      session.transfer(flowFile, FAILURE);
+
+    } catch (Throwable t) {
+      getLogger().error("Unknown Throwable", t);
+      session.rollback(true); // penalize just in case
+      context.yield();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
new file mode 100644
index 0000000..5200f19
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.kite;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.Set;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.StreamCallback;
+import org.kitesdk.data.DatasetException;
+import org.kitesdk.data.DatasetIOException;
+import org.kitesdk.data.DatasetRecordException;
+import org.kitesdk.data.spi.DefaultConfiguration;
+
+@Tags({"kite", "json", "avro"})
+@CapabilityDescription(
+    "Converts JSON files to Avro according to an Avro Schema")
+public class ConvertJSONToAvro extends AbstractKiteProcessor {
+
+  private static Relationship SUCCESS = new Relationship.Builder()
+      .name("success")
+      .description("FlowFile content has been successfully saved")
+      .build();
+
+  private static Relationship FAILURE = new Relationship.Builder()
+      .name("failure")
+      .description("FlowFile content could not be processed")
+      .build();
+
+  @VisibleForTesting
+  static final PropertyDescriptor SCHEMA =
+      new PropertyDescriptor.Builder()
+          .name("Record schema")
+          .description(
+              "Outgoing Avro schema for each record created from a JSON 
object")
+          .addValidator(SCHEMA_VALIDATOR)
+          .required(true)
+          .build();
+
+  private static final List<PropertyDescriptor> PROPERTIES =
+      ImmutableList.<PropertyDescriptor>builder()
+          .addAll(AbstractKiteProcessor.getProperties())
+          .add(SCHEMA)
+          .build();
+
+  private static final Set<Relationship> RELATIONSHIPS =
+      ImmutableSet.<Relationship>builder()
+          .add(SUCCESS)
+          .add(FAILURE)
+          .build();
+
+  public ConvertJSONToAvro() {
+  }
+
+  @Override
+  protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+    return PROPERTIES;
+  }
+
+  @Override
+  public Set<Relationship> getRelationships() {
+    return RELATIONSHIPS;
+  }
+
+  @Override
+  public void onTrigger(ProcessContext context, final ProcessSession session)
+      throws ProcessException {
+    FlowFile flowFile = session.get();
+    if (flowFile == null) {
+      return;
+    }
+
+    final Schema schema = getSchema(
+        context.getProperty(SCHEMA).getValue(),
+        DefaultConfiguration.get());
+
+    final DataFileWriter<Record> writer = new DataFileWriter<>(
+        AvroUtil.newDatumWriter(schema, Record.class));
+    writer.setCodec(CodecFactory.snappyCodec());
+
+    try {
+      flowFile = session.write(flowFile, new StreamCallback() {
+        @Override
+        public void process(InputStream in, OutputStream out) throws 
IOException {
+          long written = 0L;
+          long errors = 0L;
+          try (JSONFileReader<Record> reader = new JSONFileReader<>(
+              in, schema, Record.class)) {
+            reader.initialize();
+            try (DataFileWriter<Record> w = writer.create(schema, out)) {
+              while (reader.hasNext()) {
+                try {
+                  Record record = reader.next();
+                  w.append(record);
+                  written += 1;
+                } catch (DatasetRecordException e) {
+                  errors += 1;
+                }
+              }
+            }
+            session.adjustCounter("Converted records", written,
+                false /* update only if file transfer is successful */);
+            session.adjustCounter("Conversion errors", errors,
+                false /* update only if file transfer is successful */);
+          }
+        }
+      });
+
+      session.transfer(flowFile, SUCCESS);
+
+      //session.getProvenanceReporter().send(flowFile, 
target.getUri().toString());
+    } catch (ProcessException | DatasetIOException e) {
+      getLogger().error("Failed reading or writing", e);
+      session.transfer(flowFile, FAILURE);
+
+    } catch (DatasetException e) {
+      getLogger().error("Failed to read FlowFile", e);
+      session.transfer(flowFile, FAILURE);
+
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/JSONFileReader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/JSONFileReader.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/JSONFileReader.java
new file mode 100644
index 0000000..7bb7f3d
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/JSONFileReader.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.kite;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import javax.annotation.Nullable;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.kitesdk.data.DatasetIOException;
+import org.kitesdk.data.spi.AbstractDatasetReader;
+import org.kitesdk.data.spi.DataModelUtil;
+import org.kitesdk.data.spi.JsonUtil;
+import org.kitesdk.data.spi.ReaderWriterState;
+
+/**
+ * This is a temporary addition. The version in 0.18.0 throws a NPE when the
+ * InputStream constructor is used.
+ */
+class JSONFileReader<E> extends AbstractDatasetReader<E> {
+
+  private final GenericData model;
+  private final Schema schema;
+
+  private InputStream incoming = null;
+
+  // state
+  private ReaderWriterState state = ReaderWriterState.NEW;
+  private Iterator<E> iterator;
+
+  public JSONFileReader(InputStream incoming, Schema schema, Class<E> type) {
+    this.incoming = incoming;
+    this.schema = schema;
+    this.model = DataModelUtil.getDataModelForType(type);
+    this.state = ReaderWriterState.NEW;
+  }
+
+  @Override
+  public void initialize() {
+    Preconditions.checkState(state.equals(ReaderWriterState.NEW),
+        "A reader may not be opened more than once - current state:%s", state);
+    Preconditions.checkArgument(Schema.Type.RECORD.equals(schema.getType()),
+        "Schemas for JSON files should be record");
+
+    this.iterator = Iterators.transform(JsonUtil.parser(incoming),
+        new Function<JsonNode, E>() {
+          @Override
+          @SuppressWarnings("unchecked")
+          public E apply(@Nullable JsonNode node) {
+            return (E) JsonUtil.convertToAvro(model, node, schema);
+          }
+        });
+
+    this.state = ReaderWriterState.OPEN;
+  }
+
+  @Override
+  public boolean hasNext() {
+    Preconditions.checkState(state.equals(ReaderWriterState.OPEN),
+        "Attempt to read from a file in state:%s", state);
+    return iterator.hasNext();
+  }
+
+  @Override
+  public E next() {
+    Preconditions.checkState(state.equals(ReaderWriterState.OPEN),
+        "Attempt to read from a file in state:%s", state);
+    return iterator.next();
+  }
+
+  @Override
+  public void close() {
+    if (!state.equals(ReaderWriterState.OPEN)) {
+      return;
+    }
+
+    iterator = null;
+    try {
+      incoming.close();
+    } catch (IOException e) {
+      throw new DatasetIOException("Unable to close reader path", e);
+    }
+
+    state = ReaderWriterState.CLOSED;
+  }
+
+  @Override
+  public boolean isOpen() {
+    return (this.state == ReaderWriterState.OPEN);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
new file mode 100644
index 0000000..223cacc
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.kite;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.util.StopWatch;
+import org.kitesdk.data.DatasetIOException;
+import org.kitesdk.data.DatasetWriter;
+import org.kitesdk.data.Datasets;
+import org.kitesdk.data.IncompatibleSchemaException;
+import org.kitesdk.data.ValidationException;
+import org.kitesdk.data.View;
+import org.kitesdk.data.spi.SchemaValidationUtil;
+
+@Tags({"kite", "avro", "parquet", "hive", "hdfs", "hbase"})
+@CapabilityDescription("Stores Avro records in a Kite dataset")
+public class StoreInKiteDataset extends AbstractKiteProcessor {
+  private static Relationship SUCCESS = new Relationship.Builder()
+      .name("success")
+      .description("FlowFile content has been successfully saved")
+      .build();
+
+  private static Relationship INCOMPATIBLE = new Relationship.Builder()
+      .name("incompatible")
+      .description("FlowFile content is not compatible with the target 
dataset")
+      .build();
+
+  private static Relationship FAILURE = new Relationship.Builder()
+      .name("failure")
+      .description("FlowFile content could not be processed")
+      .build();
+
+  public static final PropertyDescriptor KITE_DATASET_URI =
+      new PropertyDescriptor.Builder()
+          .name("Target dataset URI")
+          .description(
+              "URI that identifies a Kite dataset where data will be stored")
+          .addValidator(RECOGNIZED_URI)
+          .expressionLanguageSupported(true)
+          .required(true)
+          .build();
+
+  private static final List<PropertyDescriptor> PROPERTIES =
+      ImmutableList.<PropertyDescriptor>builder()
+          .addAll(AbstractKiteProcessor.getProperties())
+          .add(KITE_DATASET_URI)
+          .build();
+
+  private static final Set<Relationship> RELATIONSHIPS =
+      ImmutableSet.<Relationship>builder()
+          .add(SUCCESS)
+          .add(INCOMPATIBLE)
+          .add(FAILURE)
+          .build();
+
+  @Override
+  protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+    return PROPERTIES;
+  }
+
+  @Override
+  public Set<Relationship> getRelationships() {
+    return RELATIONSHIPS;
+  }
+
+  @Override
+  public void onTrigger(ProcessContext context, final ProcessSession session)
+      throws ProcessException {
+    FlowFile flowFile = session.get();
+    if (flowFile == null) {
+      return;
+    }
+
+    final View<Record> target = load(context, flowFile);
+    final Schema schema = target.getDataset().getDescriptor().getSchema();
+
+    try {
+      StopWatch timer = new StopWatch(true);
+      session.read(flowFile, new InputStreamCallback() {
+        @Override
+        public void process(InputStream in) throws IOException {
+          try (DataFileStream<Record> stream = new DataFileStream<>(
+              in, AvroUtil.newDatumReader(schema, Record.class))) {
+            IncompatibleSchemaException.check(
+                SchemaValidationUtil.canRead(stream.getSchema(), schema),
+                "Incompatible file schema %s, expected %s",
+                stream.getSchema(), schema);
+
+            long written = 0L;
+            try (DatasetWriter<Record> writer = target.newWriter()) {
+              for (Record record : stream) {
+                writer.write(record);
+                written += 1;
+              }
+            } finally {
+              session.adjustCounter("Stored records", written,
+                  true /* cannot roll back the write */);
+            }
+          }
+        }
+      });
+      timer.stop();
+
+      session.getProvenanceReporter().send(flowFile,
+          target.getUri().toString(),
+          timer.getDuration(TimeUnit.MILLISECONDS),
+          true /* cannot roll back the write */ );
+
+      session.transfer(flowFile, SUCCESS);
+
+    } catch (ProcessException | DatasetIOException e) {
+      getLogger().error("Failed to read FlowFile", e);
+      session.transfer(flowFile, FAILURE);
+
+    } catch (ValidationException e) {
+      getLogger().error(e.getMessage());
+      getLogger().debug("Incompatible schema error", e);
+      session.transfer(flowFile, INCOMPATIBLE);
+
+    } catch (Throwable t) {
+      getLogger().error("Unknown Throwable", t);
+      session.rollback(true); // penalize just in case
+      context.yield();
+    }
+  }
+
+  private View<Record> load(ProcessContext context, FlowFile file) {
+    String uri = context.getProperty(KITE_DATASET_URI)
+        .evaluateAttributeExpressions(file)
+        .getValue();
+    return Datasets.load(uri, Record.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/kitesdk/data/spi/filesystem/CSVFileReaderFixed.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/kitesdk/data/spi/filesystem/CSVFileReaderFixed.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/kitesdk/data/spi/filesystem/CSVFileReaderFixed.java
new file mode 100644
index 0000000..d6c2006
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/kitesdk/data/spi/filesystem/CSVFileReaderFixed.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kitesdk.data.spi.filesystem;
+
+import au.com.bytecode.opencsv.CSVReader;
+import java.io.InputStream;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.kitesdk.data.DatasetDescriptor;
+import org.kitesdk.data.DatasetIOException;
+import org.kitesdk.data.spi.AbstractDatasetReader;
+import org.kitesdk.data.spi.DescriptorUtil;
+import org.kitesdk.data.spi.EntityAccessor;
+import org.kitesdk.data.spi.ReaderWriterState;
+import com.google.common.base.Preconditions;
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.kitesdk.data.spi.filesystem.CSVProperties;
+import org.kitesdk.data.spi.filesystem.CSVUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+import static 
org.kitesdk.data.spi.filesystem.FileSystemProperties.REUSE_RECORDS;
+
+/**
+ * This is a temporary addition. The version in 0.18.0 throws a NPE when the
+ * InputStream constructor is used.
+ */
+public class CSVFileReaderFixed<E> extends AbstractDatasetReader<E> {
+
+  private final CSVProperties props;
+  private final Schema schema;
+  private final boolean reuseRecords;
+
+  private final Class<E> recordClass;
+
+  private CSVReader reader = null;
+  private CSVRecordBuilder<E> builder;
+
+  private InputStream incoming = null;
+
+  // state
+  private ReaderWriterState state = ReaderWriterState.NEW;
+  private boolean hasNext = false;
+  private String[] next = null;
+  private E record = null;
+
+  public CSVFileReaderFixed(InputStream incoming, CSVProperties props,
+                       Schema schema, Class<E> type) {
+    this.incoming = incoming;
+    this.schema = schema;
+    this.recordClass = type;
+    this.state = ReaderWriterState.NEW;
+    this.props = props;
+    this.reuseRecords = false;
+
+    Preconditions.checkArgument(Schema.Type.RECORD.equals(schema.getType()),
+        "Schemas for CSV files must be records of primitive types");
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void initialize() {
+    Preconditions.checkState(state.equals(ReaderWriterState.NEW),
+        "A reader may not be opened more than once - current state:%s", state);
+
+    this.reader = CSVUtil.newReader(incoming, props);
+
+    List<String> header = null;
+    if (props.useHeader) {
+      this.hasNext = advance();
+      header = Lists.newArrayList(next);
+    } else if (props.header != null) {
+      try {
+        header = Lists.newArrayList(
+            CSVUtil.newParser(props).parseLine(props.header));
+      } catch (IOException e) {
+        throw new DatasetIOException(
+            "Failed to parse header from properties: " + props.header, e);
+      }
+    }
+
+    this.builder = new CSVRecordBuilder<E>(schema, recordClass, header);
+
+    // initialize by reading the first record
+    this.hasNext = advance();
+
+    this.state = ReaderWriterState.OPEN;
+  }
+
+  @Override
+  public boolean hasNext() {
+    Preconditions.checkState(state.equals(ReaderWriterState.OPEN),
+        "Attempt to read from a file in state:%s", state);
+    return hasNext;
+  }
+
+  @Override
+  public E next() {
+    Preconditions.checkState(state.equals(ReaderWriterState.OPEN),
+        "Attempt to read from a file in state:%s", state);
+
+    if (!hasNext) {
+      throw new NoSuchElementException();
+    }
+
+    try {
+      if (reuseRecords) {
+        this.record = builder.makeRecord(next, record);
+        return record;
+      } else {
+        return builder.makeRecord(next, null);
+      }
+    } finally {
+      this.hasNext = advance();
+    }
+  }
+
+  private boolean advance() {
+    try {
+      next = reader.readNext();
+    } catch (IOException e) {
+      throw new DatasetIOException("Could not read record", e);
+    }
+    return (next != null);
+  }
+
+  @Override
+  public void close() {
+    if (!state.equals(ReaderWriterState.OPEN)) {
+      return;
+    }
+
+    try {
+      reader.close();
+    } catch (IOException e) {
+      throw new DatasetIOException("Unable to close reader", e);
+    }
+
+    state = ReaderWriterState.CLOSED;
+  }
+
+  @Override
+  public boolean isOpen() {
+    return (this.state == ReaderWriterState.OPEN);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..6de5612
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.kite.StoreInKiteDataset
+org.apache.nifi.processors.kite.ConvertCSVToAvro
+org.apache.nifi.processors.kite.ConvertJSONToAvro

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
new file mode 100644
index 0000000..dbe3b81
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.kite;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.nifi.processors.kite.TestUtil.streamFor;
+
+public class TestCSVToAvroProcessor {
+
+  public static final Schema SCHEMA = SchemaBuilder.record("Test").fields()
+      .requiredLong("id")
+      .requiredString("color")
+      .optionalDouble("price")
+      .endRecord();
+
+  public static final String CSV_CONTENT = "" +
+      "1,green\n" +
+      ",blue,\n" + // invalid, ID is missing
+      "2,grey,12.95";
+
+  @Test
+  public void testBasicConversion() throws IOException {
+    TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
+    runner.assertNotValid();
+    runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
+    runner.assertValid();
+
+    runner.enqueue(streamFor(CSV_CONTENT));
+    runner.run();
+
+    long converted = runner.getCounterValue("Converted records");
+    long errors = runner.getCounterValue("Conversion errors");
+    Assert.assertEquals("Should convert 2 rows", 2, converted);
+    Assert.assertEquals("Should reject 1 row", 1, errors);
+
+    runner.assertAllFlowFilesTransferred("success", 1);
+  }
+
+  @Test
+  public void testAlternateCharset() throws IOException {
+    TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
+    runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
+    runner.setProperty(ConvertCSVToAvro.CHARSET, "utf16");
+    runner.assertValid();
+
+    runner.enqueue(streamFor(CSV_CONTENT, Charset.forName("UTF-16")));
+    runner.run();
+
+    long converted = runner.getCounterValue("Converted records");
+    long errors = runner.getCounterValue("Conversion errors");
+    Assert.assertEquals("Should convert 2 rows", 2, converted);
+    Assert.assertEquals("Should reject 1 row", 1, errors);
+
+    runner.assertAllFlowFilesTransferred("success", 1);
+  }
+
+  @Test
+  public void testCSVProperties() throws IOException {
+    TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
+    ConvertCSVToAvro processor = new ConvertCSVToAvro();
+    ProcessContext context = runner.getProcessContext();
+
+    // check defaults
+    processor.createCSVProperties(context);
+    Assert.assertEquals("Charset should match",
+        "utf8", processor.props.charset);
+    Assert.assertEquals("Delimiter should match",
+        ",", processor.props.delimiter);
+    Assert.assertEquals("Quote should match",
+        "\"", processor.props.quote);
+    Assert.assertEquals("Escape should match",
+        "\\", processor.props.escape);
+    Assert.assertEquals("Header flag should match",
+        false, processor.props.useHeader);
+    Assert.assertEquals("Lines to skip should match",
+        0, processor.props.linesToSkip);
+
+    runner.setProperty(ConvertCSVToAvro.CHARSET, "utf16");
+    runner.setProperty(ConvertCSVToAvro.DELIMITER, "|");
+    runner.setProperty(ConvertCSVToAvro.QUOTE, "'");
+    runner.setProperty(ConvertCSVToAvro.ESCAPE, "\u2603");
+    runner.setProperty(ConvertCSVToAvro.HAS_HEADER, "true");
+    runner.setProperty(ConvertCSVToAvro.LINES_TO_SKIP, "2");
+
+    // check updates
+    processor.createCSVProperties(context);
+    Assert.assertEquals("Charset should match",
+        "utf16", processor.props.charset);
+    Assert.assertEquals("Delimiter should match",
+        "|", processor.props.delimiter);
+    Assert.assertEquals("Quote should match",
+        "'", processor.props.quote);
+    Assert.assertEquals("Escape should match",
+        "\u2603", processor.props.escape);
+    Assert.assertEquals("Header flag should match",
+        true, processor.props.useHeader);
+    Assert.assertEquals("Lines to skip should match",
+        2, processor.props.linesToSkip);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java
new file mode 100644
index 0000000..7b1019d
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.kite;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.kitesdk.data.spi.DefaultConfiguration;
+
+public class TestConfigurationProperty {
+
+  @Rule
+  public final TemporaryFolder temp = new TemporaryFolder();
+  public File confLocation;
+
+  @Before
+  public void saveConfiguration() throws IOException {
+    Configuration conf = new Configuration(false);
+    conf.setBoolean("nifi.config.canary", true);
+
+    confLocation = temp.newFile("nifi-conf.xml");
+    FileOutputStream out = new FileOutputStream(confLocation);
+    conf.writeXml(out);
+    out.close();
+  }
+
+  @Test
+  public void testConfigurationCanary() throws IOException {
+    TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
+    runner.setProperty(
+        AbstractKiteProcessor.CONF_XML_FILES, confLocation.toString());
+
+    Assert.assertFalse("Should not contain canary value",
+        DefaultConfiguration.get().getBoolean("nifi.config.canary", false));
+
+    AbstractKiteProcessor processor = new StoreInKiteDataset();
+    ProcessContext context = runner.getProcessContext();
+    processor.setDefaultConfiguration(context);
+
+    Assert.assertTrue("Should contain canary value",
+        DefaultConfiguration.get().getBoolean("nifi.config.canary", false));
+  }
+
+  @Test
+  public void testFilesMustExist() throws IOException {
+    TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
+    runner.setProperty(
+        AbstractKiteProcessor.CONF_XML_FILES, temp.newFile().toString());
+    runner.assertNotValid();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestGetSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestGetSchema.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestGetSchema.java
new file mode 100644
index 0000000..18b356d
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestGetSchema.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.kite;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.kitesdk.data.DatasetDescriptor;
+import org.kitesdk.data.Datasets;
+import org.kitesdk.data.spi.DefaultConfiguration;
+
+import static org.apache.nifi.processors.kite.TestUtil.bytesFor;
+
+public class TestGetSchema {
+
+  public static final Schema SCHEMA = SchemaBuilder.record("Test").fields()
+      .requiredLong("id")
+      .requiredString("color")
+      .optionalDouble("price")
+      .endRecord();
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Test
+  public void testSchemaFromFileSystem() throws IOException {
+    File schemaFile = temp.newFile("schema.avsc");
+    FileOutputStream out = new FileOutputStream(schemaFile);
+    out.write(bytesFor(SCHEMA.toString(), Charset.forName("utf8")));
+    out.close();
+
+    Schema schema = AbstractKiteProcessor.getSchema(
+        schemaFile.toString(), DefaultConfiguration.get());
+
+    Assert.assertEquals("Schema from file should match", SCHEMA, schema);
+  }
+
+  @Test
+  public void testSchemaFromKiteURIs() throws IOException {
+    String location = temp.newFolder("ns", "temp").toString();
+    String datasetUri = "dataset:file:" + location;
+    DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
+        .schema(SCHEMA)
+        .build();
+
+    Datasets.create(datasetUri, descriptor);
+
+    Schema schema = AbstractKiteProcessor.getSchema(
+        datasetUri, DefaultConfiguration.get());
+    Assert.assertEquals("Schema from dataset URI should match", SCHEMA, 
schema);
+
+    schema = AbstractKiteProcessor.getSchema(
+        "view:file:" + location + "?color=orange", DefaultConfiguration.get());
+    Assert.assertEquals("Schema from view URI should match", SCHEMA, schema);
+  }
+
+  @Test
+  public void testSchemaFromResourceURI() throws IOException {
+    DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
+        .schemaUri("resource:schema/user.avsc") // in kite-data-core test-jar
+        .build();
+    Schema expected = descriptor.getSchema();
+
+    Schema schema = AbstractKiteProcessor.getSchema(
+        "resource:schema/user.avsc", DefaultConfiguration.get());
+
+    Assert.assertEquals("Schema from resource URI should match",
+        expected, schema);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
new file mode 100644
index 0000000..434b969
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.kite;
+
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.nifi.processors.kite.TestUtil.streamFor;
+
+public class TestJSONToAvroProcessor {
+  public static final Schema SCHEMA = SchemaBuilder.record("Test").fields()
+      .requiredLong("id")
+      .requiredString("color")
+      .optionalDouble("price")
+      .endRecord();
+
+  public static final String JSON_CONTENT = "" +
+      "{\"id\": 1,\"color\": \"green\"}" +
+      "{\"id\": \"120V\", \"color\": \"blue\"}\n" + // invalid, ID is a string
+      "{\"id\": 2, \"color\": \"grey\", \"price\": 12.95 }";
+
+  @Test
+  public void testBasicConversion() throws IOException {
+    TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class);
+    runner.assertNotValid();
+    runner.setProperty(ConvertJSONToAvro.SCHEMA, SCHEMA.toString());
+    runner.assertValid();
+
+    runner.enqueue(streamFor(JSON_CONTENT));
+    runner.run();
+
+    long converted = runner.getCounterValue("Converted records");
+    long errors = runner.getCounterValue("Conversion errors");
+    Assert.assertEquals("Should convert 2 rows", 2, converted);
+    Assert.assertEquals("Should reject 1 row", 1, errors);
+
+    runner.assertAllFlowFilesTransferred("success", 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteProcessorsCluster.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteProcessorsCluster.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteProcessorsCluster.java
new file mode 100644
index 0000000..a191e5d
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteProcessorsCluster.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.kite;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.kitesdk.data.Dataset;
+import org.kitesdk.data.DatasetDescriptor;
+import org.kitesdk.data.Datasets;
+import org.kitesdk.data.spi.DefaultConfiguration;
+import org.kitesdk.minicluster.HdfsService;
+import org.kitesdk.minicluster.HiveService;
+import org.kitesdk.minicluster.MiniCluster;
+
+import static org.apache.nifi.processors.kite.TestUtil.USER_SCHEMA;
+import static org.apache.nifi.processors.kite.TestUtil.bytesFor;
+import static org.apache.nifi.processors.kite.TestUtil.streamFor;
+import static org.apache.nifi.processors.kite.TestUtil.user;
+
+public class TestKiteProcessorsCluster {
+
+  public static MiniCluster cluster = null;
+  public static DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
+      .schema(USER_SCHEMA)
+      .build();
+
+  @BeforeClass
+  public static void startCluster() throws IOException, InterruptedException {
+    long rand = Math.abs((long) (Math.random() * 1000000));
+    cluster = new MiniCluster.Builder()
+        .workDir("/tmp/minicluster-" + rand)
+        .clean(true)
+        .addService(HdfsService.class)
+        .addService(HiveService.class)
+        .bindIP("127.0.0.1")
+        .hiveMetastorePort(9083)
+        .build();
+    cluster.start();
+  }
+
+  @AfterClass
+  public static void stopCluster() throws IOException, InterruptedException {
+    if (cluster != null) {
+      cluster.stop();
+      cluster = null;
+    }
+  }
+
+  @Test
+  public void testBasicStoreToHive() throws IOException {
+    String datasetUri = "dataset:hive:ns/test";
+
+    Dataset<Record> dataset = Datasets.create(datasetUri, descriptor, 
Record.class);
+
+    TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
+    runner.assertNotValid();
+
+    runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
+    runner.assertValid();
+
+    List<Record> users = Lists.newArrayList(
+        user("a", "a...@example.com"),
+        user("b", "b...@example.com"),
+        user("c", "c...@example.com")
+    );
+
+    runner.enqueue(streamFor(users));
+    runner.run();
+
+    runner.assertAllFlowFilesTransferred("success", 1);
+    List<Record> stored = Lists.newArrayList(
+        (Iterable<Record>) dataset.newReader());
+    Assert.assertEquals("Records should match", users, stored);
+
+    Datasets.delete(datasetUri);
+  }
+
+  @Test
+  public void testSchemaFromDistributedFileSystem() throws IOException {
+    Schema expected = SchemaBuilder.record("Test").fields()
+        .requiredLong("id")
+        .requiredString("color")
+        .optionalDouble("price")
+        .endRecord();
+
+    Path schemaPath = new Path("hdfs:/tmp/schema.avsc");
+    FileSystem fs = schemaPath.getFileSystem(DefaultConfiguration.get());
+    OutputStream out = fs.create(schemaPath);
+    out.write(bytesFor(expected.toString(), Charset.forName("utf8")));
+    out.close();
+
+    Schema schema = AbstractKiteProcessor.getSchema(
+        schemaPath.toString(), DefaultConfiguration.get());
+
+    Assert.assertEquals("Schema from file should match", expected, schema);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteStorageProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteStorageProcessor.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteStorageProcessor.java
new file mode 100644
index 0000000..714c3d2
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteStorageProcessor.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.kite;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.kitesdk.data.Dataset;
+import org.kitesdk.data.DatasetDescriptor;
+import org.kitesdk.data.Datasets;
+
+import static org.apache.nifi.processors.kite.TestUtil.invalidStreamFor;
+import static org.apache.nifi.processors.kite.TestUtil.streamFor;
+import static org.apache.nifi.processors.kite.TestUtil.user;
+
+public class TestKiteStorageProcessor {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  private String datasetUri = null;
+  private Dataset<Record> dataset = null;
+
+  @Before
+  public void createDataset() throws Exception {
+    DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
+        .schema(TestUtil.USER_SCHEMA)
+        .build();
+    this.datasetUri = "dataset:file:" + temp.newFolder("ns", 
"temp").toString();
+    this.dataset = Datasets.create(datasetUri, descriptor, Record.class);
+  }
+
+  @After
+  public void deleteDataset() throws Exception {
+    Datasets.delete(datasetUri);
+  }
+
+  @Test
+  public void testBasicStore() throws IOException {
+    TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
+    runner.assertNotValid();
+
+    runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
+    runner.assertValid();
+
+    List<Record> users = Lists.newArrayList(
+        user("a", "a...@example.com"),
+        user("b", "b...@example.com"),
+        user("c", "c...@example.com")
+    );
+
+    runner.enqueue(streamFor(users));
+    runner.run();
+
+    runner.assertAllFlowFilesTransferred("success", 1);
+    runner.assertQueueEmpty();
+    Assert.assertEquals("Should store 3 values",
+        3, (long) runner.getCounterValue("Stored records"));
+
+    List<Record> stored = Lists.newArrayList(
+        (Iterable<Record>) dataset.newReader());
+    Assert.assertEquals("Records should match", users, stored);
+  }
+
+  @Test
+  public void testViewURI() {
+    TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
+    runner.setProperty(
+        StoreInKiteDataset.KITE_DATASET_URI, "view:hive:ns/table?year=2015");
+    runner.assertValid();
+  }
+
+  @Test
+  public void testInvalidURI() {
+    TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
+    runner.setProperty(
+        StoreInKiteDataset.KITE_DATASET_URI, "dataset:unknown");
+    runner.assertNotValid();
+  }
+
+  @Test
+  public void testUnreadableContent() throws IOException {
+    TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
+    runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
+    runner.assertValid();
+
+    runner.enqueue(invalidStreamFor(user("a", "a...@example.com")));
+    runner.run();
+
+    runner.assertAllFlowFilesTransferred("failure", 1);
+  }
+
+  @Test
+  public void testCorruptedBlocks() throws IOException {
+    TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
+    runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
+    runner.assertValid();
+
+    List<Record> records = Lists.newArrayList();
+    for (int i = 0; i < 10000; i += 1) {
+      String num = String.valueOf(i);
+      records.add(user(num, num + "@example.com"));
+    }
+
+    runner.enqueue(invalidStreamFor(records));
+    runner.run();
+
+    long stored = runner.getCounterValue("Stored records");
+    Assert.assertTrue("Should store some readable values",
+        0 < stored && stored < 10000);
+
+    runner.assertAllFlowFilesTransferred("success", 1);
+  }
+
+  @Test
+  public void testIncompatibleSchema() throws IOException {
+    Schema incompatible = SchemaBuilder.record("User").fields()
+        .requiredLong("id")
+        .requiredString("username")
+        .optionalString("email") // the dataset requires this field
+        .endRecord();
+
+    // this user has the email field and could be stored, but the schema is
+    // still incompatible so the entire stream is rejected
+    Record incompatibleUser = new Record(incompatible);
+    incompatibleUser.put("id", 1L);
+    incompatibleUser.put("username", "a");
+    incompatibleUser.put("email", "a...@example.com");
+
+    TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
+    runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
+    runner.assertValid();
+
+    runner.enqueue(streamFor(incompatibleUser));
+    runner.run();
+
+    runner.assertAllFlowFilesTransferred("incompatible", 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestUtil.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestUtil.java
new file mode 100644
index 0000000..2eb30af
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestUtil.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.kite;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData.Record;
+
+public class TestUtil {
+  public static final Schema USER_SCHEMA = 
SchemaBuilder.record("User").fields()
+      .requiredString("username")
+      .requiredString("email")
+      .endRecord();
+
+  public static Record user(String username, String email) {
+    Record user = new Record(USER_SCHEMA);
+    user.put("username", username);
+    user.put("email", email);
+    return user;
+  }
+
+  public static InputStream streamFor(Record... records) throws IOException {
+    return streamFor(Arrays.asList(records));
+  }
+
+  public static InputStream streamFor(List<Record> records) throws IOException 
{
+    return new ByteArrayInputStream(bytesFor(records));
+  }
+
+  public static InputStream invalidStreamFor(Record... records) throws 
IOException {
+    return invalidStreamFor(Arrays.asList(records));
+  }
+
+  public static InputStream invalidStreamFor(List<Record> records) throws 
IOException {
+    // purposely truncate the content
+    byte[] bytes = bytesFor(records);
+    return new ByteArrayInputStream(bytes, 0, bytes.length / 2);
+  }
+
+  private static byte[] bytesFor(List<Record> records) throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    DataFileWriter<Record> writer = new DataFileWriter<>(
+        AvroUtil.newDatumWriter(records.get(0).getSchema(), Record.class));
+    writer.setCodec(CodecFactory.snappyCodec());
+    writer = writer.create(records.get(0).getSchema(), out);
+
+    for (Record record : records) {
+      writer.append(record);
+    }
+
+    writer.flush();
+
+    return out.toByteArray();
+  }
+
+  public static InputStream streamFor(String content) throws 
CharacterCodingException {
+    return streamFor(content, Charset.forName("utf8"));
+  }
+
+  public static InputStream streamFor(String content, Charset charset) throws 
CharacterCodingException {
+    return new ByteArrayInputStream(bytesFor(content, charset));
+  }
+
+  public static byte[] bytesFor(String content, Charset charset) throws 
CharacterCodingException {
+    CharBuffer chars = CharBuffer.wrap(content);
+    CharsetEncoder encoder = charset.newEncoder();
+    ByteBuffer buffer = encoder.encode(chars);
+    byte[] bytes = new byte[buffer.remaining()];
+    buffer.get(bytes);
+    return bytes;
+  }
+
+}

Reply via email to