Repository: nifi
Updated Branches:
  refs/heads/master 3d4ce3452 -> 100799941


NIFI-912 Initial version of ExtractAvroMetadata processor.
- Adding optional ability to extract record count
- Renaming record.count to item.count for clarity, and updating documentation
- Adding a test case with 0 records
- Removing validators from properties that use allowable values


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/10079994
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/10079994
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/10079994

Branch: refs/heads/master
Commit: 100799941510b085b71394a757d86b9845ab993d
Parents: 3d4ce34
Author: Bryan Bende <[email protected]>
Authored: Mon Aug 31 18:03:50 2015 -0400
Committer: Bryan Bende <[email protected]>
Committed: Wed Sep 16 09:59:50 2015 -0400

----------------------------------------------------------------------
 .../nifi-avro-processors/pom.xml                |   5 +
 .../processors/avro/ExtractAvroMetadata.java    | 219 ++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   3 +-
 .../nifi/processors/avro/AvroTestUtil.java      |  40 +++
 .../processors/avro/TestConvertAvroToJSON.java  |  24 +-
 .../avro/TestExtractAvroMetadata.java           | 256 +++++++++++++++++++
 .../src/test/resources/array.avsc               |   1 +
 7 files changed, 528 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/10079994/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/pom.xml 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/pom.xml
index 989f762..68ce57c 100644
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/pom.xml
@@ -40,6 +40,10 @@
                        <artifactId>jackson-databind</artifactId>
                </dependency>
                <dependency>
+                       <groupId>commons-codec</groupId>
+                       <artifactId>commons-codec</artifactId>
+               </dependency>
+               <dependency>
                        <groupId>org.apache.nifi</groupId>
                        <artifactId>nifi-mock</artifactId>
                        <scope>test</scope>
@@ -64,6 +68,7 @@
                                <configuration>
                                        <excludes combine.children="append">
                         <exclude>src/test/resources/user.avsc</exclude>
+                        <exclude>src/test/resources/array.avsc</exclude>
                                        </excludes>
                                </configuration>
                        </plugin>

http://git-wip-us.apache.org/repos/asf/nifi/blob/10079994/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ExtractAvroMetadata.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ExtractAvroMetadata.java
 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ExtractAvroMetadata.java
new file mode 100644
index 0000000..48aad7d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ExtractAvroMetadata.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+@SideEffectFree
+@SupportsBatching
+@Tags({ "avro", "schema", "metadata" })
+@CapabilityDescription("Extracts metadata from the header of an Avro 
datafile.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "schema.type", description = "The type of 
the schema (i.e. record, enum, etc.)."),
+        @WritesAttribute(attribute = "schema.name", description = "Contains 
the name when the type is a record, enum or fixed, " +
+                "otherwise contains the name of the primitive type."),
+        @WritesAttribute(attribute = "schema.fingerprint", description = "The 
result of the Fingerprint Algorithm as a Hex string."),
+        @WritesAttribute(attribute = "item.count", description = "The total 
number of items in the datafile, only written if Count Items " +
+                "is set to true.")
+})
+public class ExtractAvroMetadata extends AbstractProcessor {
+
+    static final AllowableValue CRC_64_AVRO = new 
AllowableValue("CRC-64-AVRO");
+    static final AllowableValue MD5 = new AllowableValue("MD5");
+    static final AllowableValue SHA_256 = new AllowableValue("SHA-256");
+
+    static final PropertyDescriptor FINGERPRINT_ALGORITHM = new 
PropertyDescriptor.Builder()
+            .name("Fingerprint Algorithm")
+            .description("The algorithm used to generate the schema 
fingerprint. Available choices are based on the Avro recommended practices for 
" +
+                    "fingerprint generation.")
+            .allowableValues(CRC_64_AVRO, MD5, SHA_256)
+            .defaultValue(CRC_64_AVRO.getValue())
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor METADATA_KEYS = new 
PropertyDescriptor.Builder()
+            .name("Metadata Keys")
+            .description("A comma-separated list of keys indicating key/value 
pairs to extract from the Avro file header. The key 'avro.schema' can " +
+                    "be used to extract the full schema in JSON format, and 
'avro.codec' can be used to extract the codec name if one exists.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .build();
+
+    static final PropertyDescriptor COUNT_ITEMS = new 
PropertyDescriptor.Builder()
+            .name("Count Items")
+            .description("If true the number of items in the datafile will be 
counted and stored in a FlowFile attribute 'item.count'. The counting is done " 
+
+                    "by reading blocks and getting the number of items for 
each block, thus avoiding de-serializing. The items being counted will be the 
top-level " +
+                    "items in the datafile. For example, with a schema of type 
record the items will be the records, and for a schema of type Array the items 
will " +
+                    "be the arrays (not the number of entries in each array).")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after 
metadata has been extracted.")
+            .build();
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if it 
cannot be parsed as Avro or metadata cannot be extracted for any reason")
+            .build();
+
+    static final String SCHEMA_TYPE_ATTR = "schema.type";
+    static final String SCHEMA_NAME_ATTR = "schema.name";
+    static final String SCHEMA_FINGERPRINT_ATTR = "schema.fingerprint";
+    static final String ITEM_COUNT_ATTR = "item.count";
+
+    private List<PropertyDescriptor> properties;
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(ProcessorInitializationContext context) {
+        super.init(context);
+
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(FINGERPRINT_ALGORITHM);
+        properties.add(METADATA_KEYS);
+        properties.add(COUNT_ITEMS);
+        this.properties = Collections.unmodifiableList(properties);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final Map<String,String> avroMetadata = new HashMap<>();
+        final Set<String> requestedMetadataKeys = new HashSet<>();
+
+        final boolean countRecords = 
context.getProperty(COUNT_ITEMS).asBoolean();
+        final String fingerprintAlgorithm = 
context.getProperty(FINGERPRINT_ALGORITHM).getValue();
+        final String metadataKeysValue = 
context.getProperty(METADATA_KEYS).getValue();
+
+        if (!StringUtils.isEmpty(metadataKeysValue)) {
+            final String[] keys = metadataKeysValue.split("\\s*,\\s*");
+            for (final String key : keys) {
+                requestedMetadataKeys.add(key.trim());
+            }
+        }
+
+        try {
+            session.read(flowFile, new InputStreamCallback() {
+                @Override
+                public void process(InputStream rawIn) throws IOException {
+                    try (final InputStream in = new BufferedInputStream(rawIn);
+                         final DataFileStream<GenericRecord> reader = new 
DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
+
+                        final Schema schema = reader.getSchema();
+                        if (schema == null) {
+                            throw new ProcessException("Avro schema was null");
+                        }
+
+                        for (String key : reader.getMetaKeys()) {
+                            if (requestedMetadataKeys.contains(key)) {
+                                avroMetadata.put(key, 
reader.getMetaString(key));
+                            }
+                        }
+
+                        try {
+                            final byte[] rawFingerprint = 
SchemaNormalization.parsingFingerprint(fingerprintAlgorithm, schema);
+                            avroMetadata.put(SCHEMA_FINGERPRINT_ATTR, 
Hex.encodeHexString(rawFingerprint));
+                            avroMetadata.put(SCHEMA_TYPE_ATTR, 
schema.getType().getName());
+                            avroMetadata.put(SCHEMA_NAME_ATTR, 
schema.getName());
+                        } catch (NoSuchAlgorithmException e) {
+                            // shouldn't happen since allowable values are 
valid algorithms
+                            throw new ProcessException(e);
+                        }
+
+                        if (countRecords) {
+                            long recordCount = reader.getBlockCount();
+                            try {
+                                while (reader.nextBlock() != null) {
+                                    recordCount += reader.getBlockCount();
+                                }
+                            } catch (NoSuchElementException e) {
+                                // happens at end of file
+                            }
+                            avroMetadata.put(ITEM_COUNT_ATTR, 
String.valueOf(recordCount));
+                        }
+                    }
+                }
+            });
+        } catch (final ProcessException pe) {
+            getLogger().error("Failed to extract Avro metadata for {} due to 
{}; transferring to failure", new Object[] {flowFile, pe});
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        flowFile = session.putAllAttributes(flowFile, avroMetadata);
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/10079994/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 67a6cd3..192ec00 100644
--- 
a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -12,4 +12,5 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-org.apache.nifi.processors.avro.ConvertAvroToJSON
\ No newline at end of file
+org.apache.nifi.processors.avro.ConvertAvroToJSON
+org.apache.nifi.processors.avro.ExtractAvroMetadata
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/10079994/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/AvroTestUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/AvroTestUtil.java
 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/AvroTestUtil.java
new file mode 100644
index 0000000..1315b18
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/AvroTestUtil.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+
+import java.io.IOException;
+
+public class AvroTestUtil {
+
+    public static ByteArrayOutputStream serializeAvroRecord(final Schema 
schema, final DatumWriter<GenericRecord> datumWriter, final GenericRecord... 
users) throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        try (DataFileWriter<GenericRecord> dataFileWriter = new 
DataFileWriter<>(datumWriter)) {
+            dataFileWriter.create(schema, out);
+            for (final GenericRecord user : users) {
+                dataFileWriter.append(user);
+            }
+        }
+        return out;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/10079994/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
index 2d84202..cfd26c1 100644
--- 
a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
+++ 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
@@ -16,22 +16,20 @@
  */
 package org.apache.nifi.processors.avro;
 
-import java.io.File;
-import java.io.IOException;
-
 import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
-import org.apache.nifi.processors.avro.ConvertAvroToJSON;
 import org.apache.nifi.stream.io.ByteArrayOutputStream;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.IOException;
+
 public class TestConvertAvroToJSON {
 
     @Test
@@ -44,7 +42,7 @@ public class TestConvertAvroToJSON {
         user1.put("favorite_number", 256);
 
         final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
-        final ByteArrayOutputStream out1 = serializeAvroRecord(schema, 
datumWriter, user1);
+        final ByteArrayOutputStream out1 = 
AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1);
         runner.enqueue(out1.toByteArray());
 
         runner.run();
@@ -69,7 +67,7 @@ public class TestConvertAvroToJSON {
         user2.put("favorite_color", "red");
 
         final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
-        final ByteArrayOutputStream out1 = serializeAvroRecord(schema, 
datumWriter, user1, user2);
+        final ByteArrayOutputStream out1 = 
AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1, user2);
         runner.enqueue(out1.toByteArray());
 
         runner.run();
@@ -87,16 +85,4 @@ public class TestConvertAvroToJSON {
         runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_FAILURE, 1);
     }
 
-    private ByteArrayOutputStream serializeAvroRecord(final Schema schema, 
final DatumWriter<GenericRecord> datumWriter, final GenericRecord... users) 
throws IOException {
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        DataFileWriter<GenericRecord> dataFileWriter = new 
DataFileWriter<GenericRecord>(datumWriter);
-        dataFileWriter.create(schema, out);
-        for (final GenericRecord user : users) {
-            dataFileWriter.append(user);
-        }
-
-        dataFileWriter.close();
-        return out;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/10079994/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestExtractAvroMetadata.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestExtractAvroMetadata.java
 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestExtractAvroMetadata.java
new file mode 100644
index 0000000..474b34c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestExtractAvroMetadata.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+
+public class TestExtractAvroMetadata {
+
+    static final String AVRO_SCHEMA_ATTR = "avro.schema";
+    static final String AVRO_CODEC_ATTR = "avro.codec";
+
+    @Test
+    public void testDefaultExtraction() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
ExtractAvroMetadata());
+
+        final Schema schema = new Schema.Parser().parse(new 
File("src/test/resources/user.avsc"));
+        final ByteArrayOutputStream out = getOutputStreamWithOneUser(schema);
+        runner.enqueue(out.toByteArray());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 
1);
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0);
+        
flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_FINGERPRINT_ATTR, 
"b2d1d8d3de2833ce");
+        flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_TYPE_ATTR, 
Schema.Type.RECORD.getName());
+        flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_NAME_ATTR, 
"User");
+        flowFile.assertAttributeNotExists(AVRO_SCHEMA_ATTR);
+        flowFile.assertAttributeNotExists(ExtractAvroMetadata.ITEM_COUNT_ATTR);
+    }
+
+    @Test
+    public void testExtractionWithItemCount() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
ExtractAvroMetadata());
+        runner.setProperty(ExtractAvroMetadata.COUNT_ITEMS, "true");
+
+        final Schema schema = new Schema.Parser().parse(new 
File("src/test/resources/user.avsc"));
+        final ByteArrayOutputStream out = 
getOutputStreamWithMultipleUsers(schema, 6000); // creates 2 blocks
+        runner.enqueue(out.toByteArray());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 
1);
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals(ExtractAvroMetadata.ITEM_COUNT_ATTR, 
"6000");
+    }
+
+    @Test
+    public void testExtractionWithZeroUsers() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
ExtractAvroMetadata());
+        runner.setProperty(ExtractAvroMetadata.COUNT_ITEMS, "true");
+
+        final Schema schema = new Schema.Parser().parse(new 
File("src/test/resources/user.avsc"));
+        final ByteArrayOutputStream out = 
getOutputStreamWithMultipleUsers(schema, 0);
+        runner.enqueue(out.toByteArray());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 
1);
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0);
+        
flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_FINGERPRINT_ATTR, 
"b2d1d8d3de2833ce");
+        flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_TYPE_ATTR, 
Schema.Type.RECORD.getName());
+        flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_NAME_ATTR, 
"User");
+        flowFile.assertAttributeEquals(ExtractAvroMetadata.ITEM_COUNT_ATTR, 
"0");
+    }
+
+    @Test
+    public void testExtractionWithMD5() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
ExtractAvroMetadata());
+        runner.setProperty(ExtractAvroMetadata.FINGERPRINT_ALGORITHM, 
ExtractAvroMetadata.MD5);
+
+        final Schema schema = new Schema.Parser().parse(new 
File("src/test/resources/user.avsc"));
+        final ByteArrayOutputStream out = getOutputStreamWithOneUser(schema);
+        runner.enqueue(out.toByteArray());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 
1);
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0);
+        
flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_FINGERPRINT_ATTR, 
"3c6a7bee8994be20314dd28c6a3af4f2");
+        flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_TYPE_ATTR, 
Schema.Type.RECORD.getName());
+        flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_NAME_ATTR, 
"User");
+        flowFile.assertAttributeNotExists(AVRO_SCHEMA_ATTR);
+    }
+
+    @Test
+    public void testExtractionWithSHA256() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
ExtractAvroMetadata());
+        runner.setProperty(ExtractAvroMetadata.FINGERPRINT_ALGORITHM, 
ExtractAvroMetadata.SHA_256);
+
+        final Schema schema = new Schema.Parser().parse(new 
File("src/test/resources/user.avsc"));
+        final ByteArrayOutputStream out = getOutputStreamWithOneUser(schema);
+        runner.enqueue(out.toByteArray());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 
1);
+
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0);
+        
flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_FINGERPRINT_ATTR, 
"683f8f51ecd208038f4f0d39820ee9dd0ef3e32a3bee9371de0a2016d501b113");
+        flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_TYPE_ATTR, 
Schema.Type.RECORD.getName());
+        flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_NAME_ATTR, 
"User");
+        flowFile.assertAttributeNotExists(AVRO_SCHEMA_ATTR);
+    }
+
+    @Test
+    public void testExtractionWithMetadataKey() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
ExtractAvroMetadata());
+        runner.setProperty(ExtractAvroMetadata.METADATA_KEYS, 
AVRO_SCHEMA_ATTR); // test dynamic attribute avro.schema
+
+        final Schema schema = new Schema.Parser().parse(new 
File("src/test/resources/user.avsc"));
+        final ByteArrayOutputStream out = getOutputStreamWithOneUser(schema);
+        runner.enqueue(out.toByteArray());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 
1);
+
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0);
+        
flowFile.assertAttributeExists(ExtractAvroMetadata.SCHEMA_FINGERPRINT_ATTR);
+        flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_TYPE_ATTR, 
Schema.Type.RECORD.getName());
+        flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_NAME_ATTR, 
"User");
+        flowFile.assertAttributeEquals(AVRO_SCHEMA_ATTR, schema.toString());
+    }
+
+    @Test
+    public void testExtractionWithMetadataKeysWhitespace() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
ExtractAvroMetadata());
+        runner.setProperty(ExtractAvroMetadata.METADATA_KEYS, "foo, bar,   " + 
AVRO_SCHEMA_ATTR); // test dynamic attribute avro.schema
+
+        final Schema schema = new Schema.Parser().parse(new 
File("src/test/resources/user.avsc"));
+        final ByteArrayOutputStream out = getOutputStreamWithOneUser(schema);
+        runner.enqueue(out.toByteArray());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 
1);
+
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0);
+        
flowFile.assertAttributeExists(ExtractAvroMetadata.SCHEMA_FINGERPRINT_ATTR);
+        flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_TYPE_ATTR, 
Schema.Type.RECORD.getName());
+        flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_NAME_ATTR, 
"User");
+        flowFile.assertAttributeEquals(AVRO_SCHEMA_ATTR, schema.toString());
+    }
+
+    @Test
+    public void testExtractionWithNonRecordSchema() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
ExtractAvroMetadata());
+        runner.setProperty(ExtractAvroMetadata.COUNT_ITEMS, "true");
+
+        final Schema schema = new Schema.Parser().parse(new 
File("src/test/resources/array.avsc"));
+
+        final GenericData.Array<String> data = new GenericData.Array<>(schema, 
Arrays.asList("one", "two", "three"));
+        final DatumWriter<GenericData.Array<String>> datumWriter = new 
GenericDatumWriter<>(schema);
+
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        final DataFileWriter<GenericData.Array<String>> dataFileWriter = new 
DataFileWriter<>(datumWriter);
+        dataFileWriter.create(schema, out);
+        dataFileWriter.append(data);
+        dataFileWriter.append(data);
+        dataFileWriter.close();
+
+        runner.enqueue(out.toByteArray());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 
1);
+
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0);
+        
flowFile.assertAttributeExists(ExtractAvroMetadata.SCHEMA_FINGERPRINT_ATTR);
+        flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_TYPE_ATTR, 
Schema.Type.ARRAY.getName());
+        flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_NAME_ATTR, 
"array");
+        flowFile.assertAttributeEquals(ExtractAvroMetadata.ITEM_COUNT_ATTR, 
"2"); // number of arrays, not elements
+    }
+
+    @Test
+    public void testExtractionWithCodec() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
ExtractAvroMetadata());
+        runner.setProperty(ExtractAvroMetadata.METADATA_KEYS, 
AVRO_CODEC_ATTR); // test dynamic attribute avro.codec
+
+        final Schema schema = new Schema.Parser().parse(new 
File("src/test/resources/array.avsc"));
+
+        final GenericData.Array<String> data = new GenericData.Array<>(schema, 
Arrays.asList("one", "two", "three"));
+        final DatumWriter<GenericData.Array<String>> datumWriter = new 
GenericDatumWriter<>(schema);
+
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        final DataFileWriter<GenericData.Array<String>> dataFileWriter = new 
DataFileWriter<>(datumWriter);
+        dataFileWriter.setCodec(CodecFactory.deflateCodec(1));
+        dataFileWriter.create(schema, out);
+        dataFileWriter.append(data);
+        dataFileWriter.close();
+
+        runner.enqueue(out.toByteArray());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 
1);
+
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("avro.codec", "deflate");
+    }
+
+    @Test
+    public void testExtractionWithBadInput() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
ExtractAvroMetadata());
+
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        out.write("not avro".getBytes("UTF-8"));
+        out.flush();
+
+        runner.enqueue(out.toByteArray());
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_FAILURE, 
1);
+    }
+
+    private ByteArrayOutputStream getOutputStreamWithOneUser(Schema schema) 
throws IOException {
+        final GenericRecord user = new GenericData.Record(schema);
+        user.put("name", "Alyssa");
+        user.put("favorite_number", 256);
+
+        final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
+        return AvroTestUtil.serializeAvroRecord(schema, datumWriter, user);
+    }
+
+    private ByteArrayOutputStream getOutputStreamWithMultipleUsers(Schema 
schema, int numUsers) throws IOException {
+        final GenericRecord[] users = new GenericRecord[numUsers];
+        for (int i=0; i < numUsers; i++) {
+            final GenericRecord user = new GenericData.Record(schema);
+            user.put("name", "user" + i);
+            user.put("favorite_number", i);
+            users[i] = user;
+        }
+
+        final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
+        return AvroTestUtil.serializeAvroRecord(schema, datumWriter, users);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/10079994/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/resources/array.avsc
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/resources/array.avsc
 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/resources/array.avsc
new file mode 100644
index 0000000..dace49a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/resources/array.avsc
@@ -0,0 +1 @@
+{"type": "array", "items": "string"}
\ No newline at end of file

Reply via email to