NIFI-238: Add Kite processors. Includes: * KiteStorageProcessor - store Avro files in a Kite dataset * KiteCSVToAvroProcessor - convert CSV to Avro for storage * KiteJSONToAvroProcessor - convert JSON to Avro for storage
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/82424173 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/82424173 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/82424173 Branch: refs/heads/batch-stream Commit: 8242417388bdfcd63cc80214b1dd1dd9c29382f5 Parents: b8ade5b Author: Ryan Blue <b...@apache.org> Authored: Thu Feb 12 18:53:49 2015 -0800 Committer: Ryan Blue <b...@apache.org> Committed: Thu Feb 19 15:34:09 2015 -0800 ---------------------------------------------------------------------- .../nifi-kite-bundle/nifi-kite-nar/pom.xml | 35 +++ .../nifi-kite-processors/pom.xml | 173 +++++++++++++ .../processors/kite/AbstractKiteProcessor.java | 217 ++++++++++++++++ .../apache/nifi/processors/kite/AvroUtil.java | 43 ++++ .../nifi/processors/kite/ConvertCSVToAvro.java | 258 +++++++++++++++++++ .../nifi/processors/kite/ConvertJSONToAvro.java | 157 +++++++++++ .../nifi/processors/kite/JSONFileReader.java | 114 ++++++++ .../processors/kite/StoreInKiteDataset.java | 168 ++++++++++++ .../data/spi/filesystem/CSVFileReaderFixed.java | 172 +++++++++++++ .../org.apache.nifi.processor.Processor | 17 ++ .../processors/kite/TestCSVToAvroProcessor.java | 126 +++++++++ .../kite/TestConfigurationProperty.java | 77 ++++++ .../nifi/processors/kite/TestGetSchema.java | 94 +++++++ .../kite/TestJSONToAvroProcessor.java | 61 +++++ .../kite/TestKiteProcessorsCluster.java | 128 +++++++++ .../kite/TestKiteStorageProcessor.java | 167 ++++++++++++ .../apache/nifi/processors/kite/TestUtil.java | 103 ++++++++ nifi/nifi-nar-bundles/nifi-kite-bundle/pom.xml | 59 +++++ 18 files changed, 2169 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-nar/pom.xml b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-nar/pom.xml new file mode 100644 index 0000000..dab7cb9 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-nar/pom.xml @@ -0,0 +1,35 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kite-bundle</artifactId> + <version>0.0.2-incubating-SNAPSHOT</version> + </parent> + + <artifactId>nifi-kite-nar</artifactId> + <packaging>nar</packaging> + + <name>Kite NAR</name> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kite-processors</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml new file mode 100644 index 0000000..311b3bf --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml @@ -0,0 +1,173 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kite-bundle</artifactId> + <version>0.0.2-incubating-SNAPSHOT</version> + </parent> + + <artifactId>nifi-kite-processors</artifactId> + <packaging>jar</packaging> + <name>Kite Hadoop Processors</name> + + <properties> + <kite.version>0.18.0</kite.version> + <guava.version>11.0.2</guava.version> + <junit.version>4.10</junit.version> + <findbugs-annotations.version>1.3.9-1</findbugs-annotations.version> + </properties> + + <dependencies> + <!-- NiFi --> + + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-flowfile-packager</artifactId> + </dependency> + + <!-- Kite --> + + <dependency> + <groupId>org.kitesdk</groupId> + <artifactId>kite-data-core</artifactId> + <version>${kite.version}</version> + <exclusions> + <exclusion> + <!-- Use findbugs-annotations instead --> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.kitesdk</groupId> + <artifactId>kite-data-hive</artifactId> + <version>${kite.version}</version> + <exclusions> + <exclusion> + <!-- Use findbugs-annotations instead --> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.kitesdk</groupId> + <artifactId>kite-data-hbase</artifactId> + <version>${kite.version}</version> + <exclusions> + <exclusion> + <!-- Use findbugs-annotations instead --> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.kitesdk</groupId> + <artifactId>kite-hadoop-dependencies</artifactId> + <type>pom</type> + <version>${kite.version}</version> + </dependency> + + <dependency> + <groupId>org.kitesdk</groupId> + <artifactId>kite-hbase-dependencies</artifactId> + <type>pom</type> + <scope>provided</scope> + <optional>true</optional> + <version>${kite.version}</version> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + <scope>compile</scope> + </dependency> + + <dependency> + <!-- avoid warnings by bundling annotations --> + <groupId>com.github.stephenc.findbugs</groupId> + <artifactId>findbugs-annotations</artifactId> + <scope>compile</scope> + <version>${findbugs-annotations.version}</version> + </dependency> + + <!-- Test dependencies --> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + <version>${junit.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.kitesdk</groupId> + <artifactId>kite-minicluster</artifactId> + <version>${kite.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-servlet</artifactId> + <version>1.14</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.kitesdk</groupId> + <artifactId>kite-data-core</artifactId> + <version>${kite.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.kitesdk</groupId> + <artifactId>kite-hadoop-test-dependencies</artifactId> + <type>pom</type> + <scope>test</scope> + <version>${kite.version}</version> + </dependency> + + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java new file mode 100644 index 0000000..c510489 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.kite; + +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; +import com.google.common.io.Resources; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.kitesdk.data.DatasetNotFoundException; +import org.kitesdk.data.Datasets; +import org.kitesdk.data.SchemaNotFoundException; +import org.kitesdk.data.URIBuilder; +import org.kitesdk.data.spi.DefaultConfiguration; + +abstract class AbstractKiteProcessor extends AbstractProcessor { + + private static final Splitter COMMA = Splitter.on(',').trimResults(); + protected static final Validator FILES_EXIST = new Validator() { + @Override + public ValidationResult validate(String subject, String configFiles, + ValidationContext context) { + if (configFiles != null && !configFiles.isEmpty()) { + for (String file : COMMA.split(configFiles)) { + ValidationResult result = StandardValidators.FILE_EXISTS_VALIDATOR + .validate(subject, file, context); + if (!result.isValid()) { + return result; + } + } + } + return new ValidationResult.Builder() + .subject(subject) + .input(configFiles) + .explanation("Files exist") + .valid(true) + .build(); + } + }; + + protected static final PropertyDescriptor CONF_XML_FILES = + new PropertyDescriptor.Builder() + .name("Hadoop configuration files") + .description("A comma-separated list of Hadoop configuration files") + .addValidator(FILES_EXIST) + .build(); + + protected static final Validator RECOGNIZED_URI = new Validator() { + @Override + public ValidationResult validate(String subject, String uri, + ValidationContext context) { + String message = "not set"; + boolean isValid = true; + if (uri == null || uri.isEmpty()) { + isValid = false; + } else { + try { + new URIBuilder(URI.create(uri)).build(); + } catch (RuntimeException e) { + message = e.getMessage(); + isValid = false; + } + } + return new ValidationResult.Builder() + .subject(subject) + .input(uri) + .explanation("Dataset URI is invalid: " + message) + .valid(isValid) + .build(); + } + }; + + /** + * Resolves a {@link Schema} for the given string, either a URI or a JSON + * literal. + */ + protected static Schema getSchema(String uriOrLiteral, Configuration conf) { + URI uri; + try { + uri = new URI(uriOrLiteral); + } catch (URISyntaxException e) { + // try to parse the schema as a literal + return parseSchema(uriOrLiteral); + } + + try { + if ("dataset".equals(uri.getScheme()) || "view".equals(uri.getScheme())) { + return Datasets.load(uri).getDataset().getDescriptor().getSchema(); + } else if ("resource".equals(uri.getScheme())) { + InputStream in = Resources.getResource(uri.getSchemeSpecificPart()) + .openStream(); + return parseSchema(uri, in); + } else { + // try to open the file + Path schemaPath = new Path(uri); + FileSystem fs = schemaPath.getFileSystem(conf); + return parseSchema(uri, fs.open(schemaPath)); + } + + } catch (DatasetNotFoundException e) { + throw new SchemaNotFoundException( + "Cannot read schema of missing dataset: " + uri, e); + } catch (IOException e) { + throw new SchemaNotFoundException( + "Failed while reading " + uri + ": " + e.getMessage(), e); + } + } + + private static Schema parseSchema(String literal) { + try { + return new Schema.Parser().parse(literal); + } catch (RuntimeException e) { + throw new SchemaNotFoundException( + "Failed to parse schema: " + literal, e); + } + } + + private static Schema parseSchema(URI uri, InputStream in) throws IOException { + try { + return new Schema.Parser().parse(in); + } catch (RuntimeException e) { + throw new SchemaNotFoundException("Failed to parse schema at " + uri, e); + } + } + + protected static final Validator SCHEMA_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(String subject, String uri, ValidationContext context) { + Configuration conf = getConfiguration( + context.getProperty(CONF_XML_FILES).getValue()); + + String error = null; + try { + getSchema(uri, conf); + } catch (SchemaNotFoundException e) { + error = e.getMessage(); + } + return new ValidationResult.Builder() + .subject(subject) + .input(uri) + .explanation(error) + .valid(error == null) + .build(); + } + }; + + protected static final List<PropertyDescriptor> ABSTRACT_KITE_PROPS = + ImmutableList.<PropertyDescriptor>builder() + .add(CONF_XML_FILES) + .build(); + + static List<PropertyDescriptor> getProperties() { + return ABSTRACT_KITE_PROPS; + } + + @OnScheduled + protected void setDefaultConfiguration(ProcessContext context) + throws IOException { + DefaultConfiguration.set(getConfiguration( + context.getProperty(CONF_XML_FILES).getValue())); + } + + protected static Configuration getConfiguration(String configFiles) { + Configuration conf = DefaultConfiguration.get(); + + if (configFiles == null || configFiles.isEmpty()) { + return conf; + } + + for (String file : COMMA.split(configFiles)) { + // process each resource only once + if (conf.getResource(file) == null) { + // use Path instead of String to get the file from the FS + conf.addResource(new Path(file)); + } + } + + return conf; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return ABSTRACT_KITE_PROPS; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroUtil.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroUtil.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroUtil.java new file mode 100644 index 0000000..0d0b04d --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroUtil.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.kite; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; + +import static org.apache.avro.generic.GenericData.StringType; + +class AvroUtil { + + @SuppressWarnings("unchecked") + public static <D> DatumWriter<D> newDatumWriter(Schema schema, Class<D> dClass) { + return (DatumWriter<D>) GenericData.get().createDatumWriter(schema); + } + + @SuppressWarnings("unchecked") + public static <D> DatumReader<D> newDatumReader(Schema schema, Class<D> dClass) { + return (DatumReader<D>) GenericData.get().createDatumReader(schema); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java new file mode 100644 index 0000000..18240c3 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.kite; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import java.util.Set; +import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData.Record; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.FlowFileAccessException; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.kitesdk.data.DatasetException; +import org.kitesdk.data.DatasetIOException; +import org.kitesdk.data.DatasetRecordException; +import org.kitesdk.data.spi.DefaultConfiguration; +import org.kitesdk.data.spi.filesystem.CSVFileReaderFixed; +import org.kitesdk.data.spi.filesystem.CSVProperties; + +import static org.apache.nifi.processor.util.StandardValidators.createLongValidator; + +@Tags({"kite", "csv", "avro"}) +@CapabilityDescription( + "Converts CSV files to Avro according to an Avro Schema") +public class ConvertCSVToAvro extends AbstractKiteProcessor { + private static CSVProperties DEFAULTS = new CSVProperties.Builder().build(); + + private static Validator CHAR_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(String subject, String input, + ValidationContext context) { + return new ValidationResult.Builder() + .subject(subject) + .input(input) + .explanation("Only single characters are supported") + .valid(input.length() == 1) + .build(); + } + }; + + private static Relationship SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFile content has been successfully saved") + .build(); + + private static Relationship FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFile content could not be processed") + .build(); + + @VisibleForTesting + static final PropertyDescriptor SCHEMA = + new PropertyDescriptor.Builder() + .name("Record schema") + .description( + "Outgoing Avro schema for each record created from a CSV row") + .addValidator(SCHEMA_VALIDATOR) + .required(true) + .build(); + + @VisibleForTesting + static final PropertyDescriptor CHARSET = + new PropertyDescriptor.Builder() + .name("CSV charset") + .description("Character set for CSV files") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .defaultValue(DEFAULTS.charset) + .build(); + + @VisibleForTesting + static final PropertyDescriptor DELIMITER = + new PropertyDescriptor.Builder() + .name("CSV delimiter") + .description("Delimiter character for CSV records") + .addValidator(CHAR_VALIDATOR) + .defaultValue(DEFAULTS.delimiter) + .build(); + + @VisibleForTesting + static final PropertyDescriptor QUOTE = + new PropertyDescriptor.Builder() + .name("CSV quote character") + .description("Quote character for CSV values") + .addValidator(CHAR_VALIDATOR) + .defaultValue(DEFAULTS.quote) + .build(); + + @VisibleForTesting + static final PropertyDescriptor ESCAPE = + new PropertyDescriptor.Builder() + .name("CSV escape character") + .description("Escape character for CSV values") + .addValidator(CHAR_VALIDATOR) + .defaultValue(DEFAULTS.escape) + .build(); + + @VisibleForTesting + static final PropertyDescriptor HAS_HEADER = + new PropertyDescriptor.Builder() + .name("Use CSV header line") + .description("Whether to use the first line as a header") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue(String.valueOf(DEFAULTS.useHeader)) + .build(); + + @VisibleForTesting + static final PropertyDescriptor LINES_TO_SKIP = + new PropertyDescriptor.Builder() + .name("Lines to skip") + .description("Number of lines to skip before reading header or data") + .addValidator(createLongValidator(0L, Integer.MAX_VALUE, true)) + .defaultValue(String.valueOf(DEFAULTS.linesToSkip)) + .build(); + + private static final List<PropertyDescriptor> PROPERTIES = + ImmutableList.<PropertyDescriptor>builder() + .addAll(AbstractKiteProcessor.getProperties()) + .add(SCHEMA) + .add(CHARSET) + .add(DELIMITER) + .add(QUOTE) + .add(ESCAPE) + .add(HAS_HEADER) + .add(LINES_TO_SKIP) + .build(); + + private static final Set<Relationship> RELATIONSHIPS = + ImmutableSet.<Relationship>builder() + .add(SUCCESS) + .add(FAILURE) + .build(); + + // Immutable configuration + @VisibleForTesting + volatile CSVProperties props; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @OnScheduled + public void createCSVProperties(ProcessContext context) throws IOException { + super.setDefaultConfiguration(context); + + this.props = new CSVProperties.Builder() + .charset(context.getProperty(CHARSET).getValue()) + .delimiter(context.getProperty(DELIMITER).getValue()) + .quote(context.getProperty(QUOTE).getValue()) + .escape(context.getProperty(ESCAPE).getValue()) + .hasHeader(context.getProperty(HAS_HEADER).asBoolean()) + .linesToSkip(context.getProperty(LINES_TO_SKIP).asInteger()) + .build(); + } + + @Override + public void onTrigger(ProcessContext context, final ProcessSession session) + throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final Schema schema = getSchema( + context.getProperty(SCHEMA).getValue(), + DefaultConfiguration.get()); + + final DataFileWriter<Record> writer = new DataFileWriter<>( + AvroUtil.newDatumWriter(schema, Record.class)); + writer.setCodec(CodecFactory.snappyCodec()); + + try { + flowFile = session.write(flowFile, new StreamCallback() { + @Override + public void process(InputStream in, OutputStream out) throws IOException { + long written = 0L; + long errors = 0L; + try (CSVFileReaderFixed<Record> reader = new CSVFileReaderFixed<>( + in, props, schema, Record.class)) { + reader.initialize(); + try (DataFileWriter<Record> w = writer.create(schema, out)) { + while (reader.hasNext()) { + try { + Record record = reader.next(); + w.append(record); + written += 1; + } catch (DatasetRecordException e) { + errors += 1; + } + } + } + } + session.adjustCounter("Converted records", written, + false /* update only if file transfer is successful */); + session.adjustCounter("Conversion errors", errors, + false /* update only if file transfer is successful */); + } + }); + + session.transfer(flowFile, SUCCESS); + + //session.getProvenanceReporter().send(flowFile, target.getUri().toString()); + } catch (ProcessException | DatasetIOException e) { + getLogger().error("Failed reading or writing", e); + session.transfer(flowFile, FAILURE); + + } catch (DatasetException e) { + getLogger().error("Failed to read FlowFile", e); + session.transfer(flowFile, FAILURE); + + } catch (Throwable t) { + getLogger().error("Unknown Throwable", t); + session.rollback(true); // penalize just in case + context.yield(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java new file mode 100644 index 0000000..5200f19 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.kite; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import java.util.Set; +import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData.Record; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.FlowFileAccessException; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.kitesdk.data.DatasetException; +import org.kitesdk.data.DatasetIOException; +import org.kitesdk.data.DatasetRecordException; +import org.kitesdk.data.spi.DefaultConfiguration; + +@Tags({"kite", "json", "avro"}) +@CapabilityDescription( + "Converts JSON files to Avro according to an Avro Schema") +public class ConvertJSONToAvro extends AbstractKiteProcessor { + + private static Relationship SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFile content has been successfully saved") + .build(); + + private static Relationship FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFile content could not be processed") + .build(); + + @VisibleForTesting + static final PropertyDescriptor SCHEMA = + new PropertyDescriptor.Builder() + .name("Record schema") + .description( + "Outgoing Avro schema for each record created from a JSON object") + .addValidator(SCHEMA_VALIDATOR) + .required(true) + .build(); + + private static final List<PropertyDescriptor> PROPERTIES = + ImmutableList.<PropertyDescriptor>builder() + .addAll(AbstractKiteProcessor.getProperties()) + .add(SCHEMA) + .build(); + + private static final Set<Relationship> RELATIONSHIPS = + ImmutableSet.<Relationship>builder() + .add(SUCCESS) + .add(FAILURE) + .build(); + + public ConvertJSONToAvro() { + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + public void onTrigger(ProcessContext context, final ProcessSession session) + throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final Schema schema = getSchema( + context.getProperty(SCHEMA).getValue(), + DefaultConfiguration.get()); + + final DataFileWriter<Record> writer = new DataFileWriter<>( + AvroUtil.newDatumWriter(schema, Record.class)); + writer.setCodec(CodecFactory.snappyCodec()); + + try { + flowFile = session.write(flowFile, new StreamCallback() { + @Override + public void process(InputStream in, OutputStream out) throws IOException { + long written = 0L; + long errors = 0L; + try (JSONFileReader<Record> reader = new JSONFileReader<>( + in, schema, Record.class)) { + reader.initialize(); + try (DataFileWriter<Record> w = writer.create(schema, out)) { + while (reader.hasNext()) { + try { + Record record = reader.next(); + w.append(record); + written += 1; + } catch (DatasetRecordException e) { + errors += 1; + } + } + } + session.adjustCounter("Converted records", written, + false /* update only if file transfer is successful */); + session.adjustCounter("Conversion errors", errors, + false /* update only if file transfer is successful */); + } + } + }); + + session.transfer(flowFile, SUCCESS); + + //session.getProvenanceReporter().send(flowFile, target.getUri().toString()); + } catch (ProcessException | DatasetIOException e) { + getLogger().error("Failed reading or writing", e); + session.transfer(flowFile, FAILURE); + + } catch (DatasetException e) { + getLogger().error("Failed to read FlowFile", e); + session.transfer(flowFile, FAILURE); + + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/JSONFileReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/JSONFileReader.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/JSONFileReader.java new file mode 100644 index 0000000..7bb7f3d --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/JSONFileReader.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.kite; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterators; +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import javax.annotation.Nullable; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.kitesdk.data.DatasetIOException; +import org.kitesdk.data.spi.AbstractDatasetReader; +import org.kitesdk.data.spi.DataModelUtil; +import org.kitesdk.data.spi.JsonUtil; +import org.kitesdk.data.spi.ReaderWriterState; + +/** + * This is a temporary addition. The version in 0.18.0 throws a NPE when the + * InputStream constructor is used. + */ +class JSONFileReader<E> extends AbstractDatasetReader<E> { + + private final GenericData model; + private final Schema schema; + + private InputStream incoming = null; + + // state + private ReaderWriterState state = ReaderWriterState.NEW; + private Iterator<E> iterator; + + public JSONFileReader(InputStream incoming, Schema schema, Class<E> type) { + this.incoming = incoming; + this.schema = schema; + this.model = DataModelUtil.getDataModelForType(type); + this.state = ReaderWriterState.NEW; + } + + @Override + public void initialize() { + Preconditions.checkState(state.equals(ReaderWriterState.NEW), + "A reader may not be opened more than once - current state:%s", state); + Preconditions.checkArgument(Schema.Type.RECORD.equals(schema.getType()), + "Schemas for JSON files should be record"); + + this.iterator = Iterators.transform(JsonUtil.parser(incoming), + new Function<JsonNode, E>() { + @Override + @SuppressWarnings("unchecked") + public E apply(@Nullable JsonNode node) { + return (E) JsonUtil.convertToAvro(model, node, schema); + } + }); + + this.state = ReaderWriterState.OPEN; + } + + @Override + public boolean hasNext() { + Preconditions.checkState(state.equals(ReaderWriterState.OPEN), + "Attempt to read from a file in state:%s", state); + return iterator.hasNext(); + } + + @Override + public E next() { + Preconditions.checkState(state.equals(ReaderWriterState.OPEN), + "Attempt to read from a file in state:%s", state); + return iterator.next(); + } + + @Override + public void close() { + if (!state.equals(ReaderWriterState.OPEN)) { + return; + } + + iterator = null; + try { + incoming.close(); + } catch (IOException e) { + throw new DatasetIOException("Unable to close reader path", e); + } + + state = ReaderWriterState.CLOSED; + } + + @Override + public boolean isOpen() { + return (this.state == ReaderWriterState.OPEN); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java new file mode 100644 index 0000000..223cacc --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.kite; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericData.Record; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.util.StopWatch; +import org.kitesdk.data.DatasetIOException; +import org.kitesdk.data.DatasetWriter; +import org.kitesdk.data.Datasets; +import org.kitesdk.data.IncompatibleSchemaException; +import org.kitesdk.data.ValidationException; +import org.kitesdk.data.View; +import org.kitesdk.data.spi.SchemaValidationUtil; + +@Tags({"kite", "avro", "parquet", "hive", "hdfs", "hbase"}) +@CapabilityDescription("Stores Avro records in a Kite dataset") +public class StoreInKiteDataset extends AbstractKiteProcessor { + private static Relationship SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFile content has been successfully saved") + .build(); + + private static Relationship INCOMPATIBLE = new Relationship.Builder() + .name("incompatible") + .description("FlowFile content is not compatible with the target dataset") + .build(); + + private static Relationship FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFile content could not be processed") + .build(); + + public static final PropertyDescriptor KITE_DATASET_URI = + new PropertyDescriptor.Builder() + .name("Target dataset URI") + .description( + "URI that identifies a Kite dataset where data will be stored") + .addValidator(RECOGNIZED_URI) + .expressionLanguageSupported(true) + .required(true) + .build(); + + private static final List<PropertyDescriptor> PROPERTIES = + ImmutableList.<PropertyDescriptor>builder() + .addAll(AbstractKiteProcessor.getProperties()) + .add(KITE_DATASET_URI) + .build(); + + private static final Set<Relationship> RELATIONSHIPS = + ImmutableSet.<Relationship>builder() + .add(SUCCESS) + .add(INCOMPATIBLE) + .add(FAILURE) + .build(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + public void onTrigger(ProcessContext context, final ProcessSession session) + throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final View<Record> target = load(context, flowFile); + final Schema schema = target.getDataset().getDescriptor().getSchema(); + + try { + StopWatch timer = new StopWatch(true); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(InputStream in) throws IOException { + try (DataFileStream<Record> stream = new DataFileStream<>( + in, AvroUtil.newDatumReader(schema, Record.class))) { + IncompatibleSchemaException.check( + SchemaValidationUtil.canRead(stream.getSchema(), schema), + "Incompatible file schema %s, expected %s", + stream.getSchema(), schema); + + long written = 0L; + try (DatasetWriter<Record> writer = target.newWriter()) { + for (Record record : stream) { + writer.write(record); + written += 1; + } + } finally { + session.adjustCounter("Stored records", written, + true /* cannot roll back the write */); + } + } + } + }); + timer.stop(); + + session.getProvenanceReporter().send(flowFile, + target.getUri().toString(), + timer.getDuration(TimeUnit.MILLISECONDS), + true /* cannot roll back the write */ ); + + session.transfer(flowFile, SUCCESS); + + } catch (ProcessException | DatasetIOException e) { + getLogger().error("Failed to read FlowFile", e); + session.transfer(flowFile, FAILURE); + + } catch (ValidationException e) { + getLogger().error(e.getMessage()); + getLogger().debug("Incompatible schema error", e); + session.transfer(flowFile, INCOMPATIBLE); + + } catch (Throwable t) { + getLogger().error("Unknown Throwable", t); + session.rollback(true); // penalize just in case + context.yield(); + } + } + + private View<Record> load(ProcessContext context, FlowFile file) { + String uri = context.getProperty(KITE_DATASET_URI) + .evaluateAttributeExpressions(file) + .getValue(); + return Datasets.load(uri, Record.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/kitesdk/data/spi/filesystem/CSVFileReaderFixed.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/kitesdk/data/spi/filesystem/CSVFileReaderFixed.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/kitesdk/data/spi/filesystem/CSVFileReaderFixed.java new file mode 100644 index 0000000..d6c2006 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/kitesdk/data/spi/filesystem/CSVFileReaderFixed.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.kitesdk.data.spi.filesystem; + +import au.com.bytecode.opencsv.CSVReader; +import java.io.InputStream; +import com.google.common.collect.Lists; +import java.util.List; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.kitesdk.data.DatasetDescriptor; +import org.kitesdk.data.DatasetIOException; +import org.kitesdk.data.spi.AbstractDatasetReader; +import org.kitesdk.data.spi.DescriptorUtil; +import org.kitesdk.data.spi.EntityAccessor; +import org.kitesdk.data.spi.ReaderWriterState; +import com.google.common.base.Preconditions; +import org.apache.avro.Schema; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.kitesdk.data.spi.filesystem.CSVProperties; +import org.kitesdk.data.spi.filesystem.CSVUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.NoSuchElementException; + +import static org.kitesdk.data.spi.filesystem.FileSystemProperties.REUSE_RECORDS; + +/** + * This is a temporary addition. The version in 0.18.0 throws a NPE when the + * InputStream constructor is used. + */ +public class CSVFileReaderFixed<E> extends AbstractDatasetReader<E> { + + private final CSVProperties props; + private final Schema schema; + private final boolean reuseRecords; + + private final Class<E> recordClass; + + private CSVReader reader = null; + private CSVRecordBuilder<E> builder; + + private InputStream incoming = null; + + // state + private ReaderWriterState state = ReaderWriterState.NEW; + private boolean hasNext = false; + private String[] next = null; + private E record = null; + + public CSVFileReaderFixed(InputStream incoming, CSVProperties props, + Schema schema, Class<E> type) { + this.incoming = incoming; + this.schema = schema; + this.recordClass = type; + this.state = ReaderWriterState.NEW; + this.props = props; + this.reuseRecords = false; + + Preconditions.checkArgument(Schema.Type.RECORD.equals(schema.getType()), + "Schemas for CSV files must be records of primitive types"); + } + + @Override + @SuppressWarnings("unchecked") + public void initialize() { + Preconditions.checkState(state.equals(ReaderWriterState.NEW), + "A reader may not be opened more than once - current state:%s", state); + + this.reader = CSVUtil.newReader(incoming, props); + + List<String> header = null; + if (props.useHeader) { + this.hasNext = advance(); + header = Lists.newArrayList(next); + } else if (props.header != null) { + try { + header = Lists.newArrayList( + CSVUtil.newParser(props).parseLine(props.header)); + } catch (IOException e) { + throw new DatasetIOException( + "Failed to parse header from properties: " + props.header, e); + } + } + + this.builder = new CSVRecordBuilder<E>(schema, recordClass, header); + + // initialize by reading the first record + this.hasNext = advance(); + + this.state = ReaderWriterState.OPEN; + } + + @Override + public boolean hasNext() { + Preconditions.checkState(state.equals(ReaderWriterState.OPEN), + "Attempt to read from a file in state:%s", state); + return hasNext; + } + + @Override + public E next() { + Preconditions.checkState(state.equals(ReaderWriterState.OPEN), + "Attempt to read from a file in state:%s", state); + + if (!hasNext) { + throw new NoSuchElementException(); + } + + try { + if (reuseRecords) { + this.record = builder.makeRecord(next, record); + return record; + } else { + return builder.makeRecord(next, null); + } + } finally { + this.hasNext = advance(); + } + } + + private boolean advance() { + try { + next = reader.readNext(); + } catch (IOException e) { + throw new DatasetIOException("Could not read record", e); + } + return (next != null); + } + + @Override + public void close() { + if (!state.equals(ReaderWriterState.OPEN)) { + return; + } + + try { + reader.close(); + } catch (IOException e) { + throw new DatasetIOException("Unable to close reader", e); + } + + state = ReaderWriterState.CLOSED; + } + + @Override + public boolean isOpen() { + return (this.state == ReaderWriterState.OPEN); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..6de5612 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.kite.StoreInKiteDataset +org.apache.nifi.processors.kite.ConvertCSVToAvro +org.apache.nifi.processors.kite.ConvertJSONToAvro http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java new file mode 100644 index 0000000..dbe3b81 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.kite; + +import java.io.IOException; +import java.nio.charset.Charset; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.nifi.processors.kite.TestUtil.streamFor; + +public class TestCSVToAvroProcessor { + + public static final Schema SCHEMA = SchemaBuilder.record("Test").fields() + .requiredLong("id") + .requiredString("color") + .optionalDouble("price") + .endRecord(); + + public static final String CSV_CONTENT = "" + + "1,green\n" + + ",blue,\n" + // invalid, ID is missing + "2,grey,12.95"; + + @Test + public void testBasicConversion() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class); + runner.assertNotValid(); + runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString()); + runner.assertValid(); + + runner.enqueue(streamFor(CSV_CONTENT)); + runner.run(); + + long converted = runner.getCounterValue("Converted records"); + long errors = runner.getCounterValue("Conversion errors"); + Assert.assertEquals("Should convert 2 rows", 2, converted); + Assert.assertEquals("Should reject 1 row", 1, errors); + + runner.assertAllFlowFilesTransferred("success", 1); + } + + @Test + public void testAlternateCharset() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class); + runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString()); + runner.setProperty(ConvertCSVToAvro.CHARSET, "utf16"); + runner.assertValid(); + + runner.enqueue(streamFor(CSV_CONTENT, Charset.forName("UTF-16"))); + runner.run(); + + long converted = runner.getCounterValue("Converted records"); + long errors = runner.getCounterValue("Conversion errors"); + Assert.assertEquals("Should convert 2 rows", 2, converted); + Assert.assertEquals("Should reject 1 row", 1, errors); + + runner.assertAllFlowFilesTransferred("success", 1); + } + + @Test + public void testCSVProperties() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class); + ConvertCSVToAvro processor = new ConvertCSVToAvro(); + ProcessContext context = runner.getProcessContext(); + + // check defaults + processor.createCSVProperties(context); + Assert.assertEquals("Charset should match", + "utf8", processor.props.charset); + Assert.assertEquals("Delimiter should match", + ",", processor.props.delimiter); + Assert.assertEquals("Quote should match", + "\"", processor.props.quote); + Assert.assertEquals("Escape should match", + "\\", processor.props.escape); + Assert.assertEquals("Header flag should match", + false, processor.props.useHeader); + Assert.assertEquals("Lines to skip should match", + 0, processor.props.linesToSkip); + + runner.setProperty(ConvertCSVToAvro.CHARSET, "utf16"); + runner.setProperty(ConvertCSVToAvro.DELIMITER, "|"); + runner.setProperty(ConvertCSVToAvro.QUOTE, "'"); + runner.setProperty(ConvertCSVToAvro.ESCAPE, "\u2603"); + runner.setProperty(ConvertCSVToAvro.HAS_HEADER, "true"); + runner.setProperty(ConvertCSVToAvro.LINES_TO_SKIP, "2"); + + // check updates + processor.createCSVProperties(context); + Assert.assertEquals("Charset should match", + "utf16", processor.props.charset); + Assert.assertEquals("Delimiter should match", + "|", processor.props.delimiter); + Assert.assertEquals("Quote should match", + "'", processor.props.quote); + Assert.assertEquals("Escape should match", + "\u2603", processor.props.escape); + Assert.assertEquals("Header flag should match", + true, processor.props.useHeader); + Assert.assertEquals("Lines to skip should match", + 2, processor.props.linesToSkip); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java new file mode 100644 index 0000000..7b1019d --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.kite; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.kitesdk.data.spi.DefaultConfiguration; + +public class TestConfigurationProperty { + + @Rule + public final TemporaryFolder temp = new TemporaryFolder(); + public File confLocation; + + @Before + public void saveConfiguration() throws IOException { + Configuration conf = new Configuration(false); + conf.setBoolean("nifi.config.canary", true); + + confLocation = temp.newFile("nifi-conf.xml"); + FileOutputStream out = new FileOutputStream(confLocation); + conf.writeXml(out); + out.close(); + } + + @Test + public void testConfigurationCanary() throws IOException { + TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); + runner.setProperty( + AbstractKiteProcessor.CONF_XML_FILES, confLocation.toString()); + + Assert.assertFalse("Should not contain canary value", + DefaultConfiguration.get().getBoolean("nifi.config.canary", false)); + + AbstractKiteProcessor processor = new StoreInKiteDataset(); + ProcessContext context = runner.getProcessContext(); + processor.setDefaultConfiguration(context); + + Assert.assertTrue("Should contain canary value", + DefaultConfiguration.get().getBoolean("nifi.config.canary", false)); + } + + @Test + public void testFilesMustExist() throws IOException { + TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); + runner.setProperty( + AbstractKiteProcessor.CONF_XML_FILES, temp.newFile().toString()); + runner.assertNotValid(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestGetSchema.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestGetSchema.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestGetSchema.java new file mode 100644 index 0000000..18b356d --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestGetSchema.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.kite; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.kitesdk.data.DatasetDescriptor; +import org.kitesdk.data.Datasets; +import org.kitesdk.data.spi.DefaultConfiguration; + +import static org.apache.nifi.processors.kite.TestUtil.bytesFor; + +public class TestGetSchema { + + public static final Schema SCHEMA = SchemaBuilder.record("Test").fields() + .requiredLong("id") + .requiredString("color") + .optionalDouble("price") + .endRecord(); + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testSchemaFromFileSystem() throws IOException { + File schemaFile = temp.newFile("schema.avsc"); + FileOutputStream out = new FileOutputStream(schemaFile); + out.write(bytesFor(SCHEMA.toString(), Charset.forName("utf8"))); + out.close(); + + Schema schema = AbstractKiteProcessor.getSchema( + schemaFile.toString(), DefaultConfiguration.get()); + + Assert.assertEquals("Schema from file should match", SCHEMA, schema); + } + + @Test + public void testSchemaFromKiteURIs() throws IOException { + String location = temp.newFolder("ns", "temp").toString(); + String datasetUri = "dataset:file:" + location; + DatasetDescriptor descriptor = new DatasetDescriptor.Builder() + .schema(SCHEMA) + .build(); + + Datasets.create(datasetUri, descriptor); + + Schema schema = AbstractKiteProcessor.getSchema( + datasetUri, DefaultConfiguration.get()); + Assert.assertEquals("Schema from dataset URI should match", SCHEMA, schema); + + schema = AbstractKiteProcessor.getSchema( + "view:file:" + location + "?color=orange", DefaultConfiguration.get()); + Assert.assertEquals("Schema from view URI should match", SCHEMA, schema); + } + + @Test + public void testSchemaFromResourceURI() throws IOException { + DatasetDescriptor descriptor = new DatasetDescriptor.Builder() + .schemaUri("resource:schema/user.avsc") // in kite-data-core test-jar + .build(); + Schema expected = descriptor.getSchema(); + + Schema schema = AbstractKiteProcessor.getSchema( + "resource:schema/user.avsc", DefaultConfiguration.get()); + + Assert.assertEquals("Schema from resource URI should match", + expected, schema); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java new file mode 100644 index 0000000..434b969 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.kite; + +import java.io.IOException; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.nifi.processors.kite.TestUtil.streamFor; + +public class TestJSONToAvroProcessor { + public static final Schema SCHEMA = SchemaBuilder.record("Test").fields() + .requiredLong("id") + .requiredString("color") + .optionalDouble("price") + .endRecord(); + + public static final String JSON_CONTENT = "" + + "{\"id\": 1,\"color\": \"green\"}" + + "{\"id\": \"120V\", \"color\": \"blue\"}\n" + // invalid, ID is a string + "{\"id\": 2, \"color\": \"grey\", \"price\": 12.95 }"; + + @Test + public void testBasicConversion() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class); + runner.assertNotValid(); + runner.setProperty(ConvertJSONToAvro.SCHEMA, SCHEMA.toString()); + runner.assertValid(); + + runner.enqueue(streamFor(JSON_CONTENT)); + runner.run(); + + long converted = runner.getCounterValue("Converted records"); + long errors = runner.getCounterValue("Conversion errors"); + Assert.assertEquals("Should convert 2 rows", 2, converted); + Assert.assertEquals("Should reject 1 row", 1, errors); + + runner.assertAllFlowFilesTransferred("success", 1); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteProcessorsCluster.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteProcessorsCluster.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteProcessorsCluster.java new file mode 100644 index 0000000..a191e5d --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteProcessorsCluster.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.kite; + +import com.google.common.collect.Lists; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData.Record; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.kitesdk.data.Dataset; +import org.kitesdk.data.DatasetDescriptor; +import org.kitesdk.data.Datasets; +import org.kitesdk.data.spi.DefaultConfiguration; +import org.kitesdk.minicluster.HdfsService; +import org.kitesdk.minicluster.HiveService; +import org.kitesdk.minicluster.MiniCluster; + +import static org.apache.nifi.processors.kite.TestUtil.USER_SCHEMA; +import static org.apache.nifi.processors.kite.TestUtil.bytesFor; +import static org.apache.nifi.processors.kite.TestUtil.streamFor; +import static org.apache.nifi.processors.kite.TestUtil.user; + +public class TestKiteProcessorsCluster { + + public static MiniCluster cluster = null; + public static DatasetDescriptor descriptor = new DatasetDescriptor.Builder() + .schema(USER_SCHEMA) + .build(); + + @BeforeClass + public static void startCluster() throws IOException, InterruptedException { + long rand = Math.abs((long) (Math.random() * 1000000)); + cluster = new MiniCluster.Builder() + .workDir("/tmp/minicluster-" + rand) + .clean(true) + .addService(HdfsService.class) + .addService(HiveService.class) + .bindIP("127.0.0.1") + .hiveMetastorePort(9083) + .build(); + cluster.start(); + } + + @AfterClass + public static void stopCluster() throws IOException, InterruptedException { + if (cluster != null) { + cluster.stop(); + cluster = null; + } + } + + @Test + public void testBasicStoreToHive() throws IOException { + String datasetUri = "dataset:hive:ns/test"; + + Dataset<Record> dataset = Datasets.create(datasetUri, descriptor, Record.class); + + TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); + runner.assertNotValid(); + + runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri); + runner.assertValid(); + + List<Record> users = Lists.newArrayList( + user("a", "a...@example.com"), + user("b", "b...@example.com"), + user("c", "c...@example.com") + ); + + runner.enqueue(streamFor(users)); + runner.run(); + + runner.assertAllFlowFilesTransferred("success", 1); + List<Record> stored = Lists.newArrayList( + (Iterable<Record>) dataset.newReader()); + Assert.assertEquals("Records should match", users, stored); + + Datasets.delete(datasetUri); + } + + @Test + public void testSchemaFromDistributedFileSystem() throws IOException { + Schema expected = SchemaBuilder.record("Test").fields() + .requiredLong("id") + .requiredString("color") + .optionalDouble("price") + .endRecord(); + + Path schemaPath = new Path("hdfs:/tmp/schema.avsc"); + FileSystem fs = schemaPath.getFileSystem(DefaultConfiguration.get()); + OutputStream out = fs.create(schemaPath); + out.write(bytesFor(expected.toString(), Charset.forName("utf8"))); + out.close(); + + Schema schema = AbstractKiteProcessor.getSchema( + schemaPath.toString(), DefaultConfiguration.get()); + + Assert.assertEquals("Schema from file should match", expected, schema); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteStorageProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteStorageProcessor.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteStorageProcessor.java new file mode 100644 index 0000000..714c3d2 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteStorageProcessor.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.kite; + +import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData.Record; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.kitesdk.data.Dataset; +import org.kitesdk.data.DatasetDescriptor; +import org.kitesdk.data.Datasets; + +import static org.apache.nifi.processors.kite.TestUtil.invalidStreamFor; +import static org.apache.nifi.processors.kite.TestUtil.streamFor; +import static org.apache.nifi.processors.kite.TestUtil.user; + +public class TestKiteStorageProcessor { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private String datasetUri = null; + private Dataset<Record> dataset = null; + + @Before + public void createDataset() throws Exception { + DatasetDescriptor descriptor = new DatasetDescriptor.Builder() + .schema(TestUtil.USER_SCHEMA) + .build(); + this.datasetUri = "dataset:file:" + temp.newFolder("ns", "temp").toString(); + this.dataset = Datasets.create(datasetUri, descriptor, Record.class); + } + + @After + public void deleteDataset() throws Exception { + Datasets.delete(datasetUri); + } + + @Test + public void testBasicStore() throws IOException { + TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); + runner.assertNotValid(); + + runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri); + runner.assertValid(); + + List<Record> users = Lists.newArrayList( + user("a", "a...@example.com"), + user("b", "b...@example.com"), + user("c", "c...@example.com") + ); + + runner.enqueue(streamFor(users)); + runner.run(); + + runner.assertAllFlowFilesTransferred("success", 1); + runner.assertQueueEmpty(); + Assert.assertEquals("Should store 3 values", + 3, (long) runner.getCounterValue("Stored records")); + + List<Record> stored = Lists.newArrayList( + (Iterable<Record>) dataset.newReader()); + Assert.assertEquals("Records should match", users, stored); + } + + @Test + public void testViewURI() { + TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); + runner.setProperty( + StoreInKiteDataset.KITE_DATASET_URI, "view:hive:ns/table?year=2015"); + runner.assertValid(); + } + + @Test + public void testInvalidURI() { + TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); + runner.setProperty( + StoreInKiteDataset.KITE_DATASET_URI, "dataset:unknown"); + runner.assertNotValid(); + } + + @Test + public void testUnreadableContent() throws IOException { + TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); + runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri); + runner.assertValid(); + + runner.enqueue(invalidStreamFor(user("a", "a...@example.com"))); + runner.run(); + + runner.assertAllFlowFilesTransferred("failure", 1); + } + + @Test + public void testCorruptedBlocks() throws IOException { + TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); + runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri); + runner.assertValid(); + + List<Record> records = Lists.newArrayList(); + for (int i = 0; i < 10000; i += 1) { + String num = String.valueOf(i); + records.add(user(num, num + "@example.com")); + } + + runner.enqueue(invalidStreamFor(records)); + runner.run(); + + long stored = runner.getCounterValue("Stored records"); + Assert.assertTrue("Should store some readable values", + 0 < stored && stored < 10000); + + runner.assertAllFlowFilesTransferred("success", 1); + } + + @Test + public void testIncompatibleSchema() throws IOException { + Schema incompatible = SchemaBuilder.record("User").fields() + .requiredLong("id") + .requiredString("username") + .optionalString("email") // the dataset requires this field + .endRecord(); + + // this user has the email field and could be stored, but the schema is + // still incompatible so the entire stream is rejected + Record incompatibleUser = new Record(incompatible); + incompatibleUser.put("id", 1L); + incompatibleUser.put("username", "a"); + incompatibleUser.put("email", "a...@example.com"); + + TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class); + runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri); + runner.assertValid(); + + runner.enqueue(streamFor(incompatibleUser)); + runner.run(); + + runner.assertAllFlowFilesTransferred("incompatible", 1); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/82424173/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestUtil.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestUtil.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestUtil.java new file mode 100644 index 0000000..2eb30af --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestUtil.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.kite; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; +import java.util.Arrays; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.file.CodecFactory; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData.Record; + +public class TestUtil { + public static final Schema USER_SCHEMA = SchemaBuilder.record("User").fields() + .requiredString("username") + .requiredString("email") + .endRecord(); + + public static Record user(String username, String email) { + Record user = new Record(USER_SCHEMA); + user.put("username", username); + user.put("email", email); + return user; + } + + public static InputStream streamFor(Record... records) throws IOException { + return streamFor(Arrays.asList(records)); + } + + public static InputStream streamFor(List<Record> records) throws IOException { + return new ByteArrayInputStream(bytesFor(records)); + } + + public static InputStream invalidStreamFor(Record... records) throws IOException { + return invalidStreamFor(Arrays.asList(records)); + } + + public static InputStream invalidStreamFor(List<Record> records) throws IOException { + // purposely truncate the content + byte[] bytes = bytesFor(records); + return new ByteArrayInputStream(bytes, 0, bytes.length / 2); + } + + private static byte[] bytesFor(List<Record> records) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataFileWriter<Record> writer = new DataFileWriter<>( + AvroUtil.newDatumWriter(records.get(0).getSchema(), Record.class)); + writer.setCodec(CodecFactory.snappyCodec()); + writer = writer.create(records.get(0).getSchema(), out); + + for (Record record : records) { + writer.append(record); + } + + writer.flush(); + + return out.toByteArray(); + } + + public static InputStream streamFor(String content) throws CharacterCodingException { + return streamFor(content, Charset.forName("utf8")); + } + + public static InputStream streamFor(String content, Charset charset) throws CharacterCodingException { + return new ByteArrayInputStream(bytesFor(content, charset)); + } + + public static byte[] bytesFor(String content, Charset charset) throws CharacterCodingException { + CharBuffer chars = CharBuffer.wrap(content); + CharsetEncoder encoder = charset.newEncoder(); + ByteBuffer buffer = encoder.encode(chars); + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + return bytes; + } + +}