Repository: nifi
Updated Branches:
  refs/heads/master 26f5c496d -> b0122c6a7


NIFI-2465 - InferAvroSchema EL support based on incoming FlowFiles

This closes #863.

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b0122c6a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b0122c6a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b0122c6a

Branch: refs/heads/master
Commit: b0122c6a73846eba4dcbff402c19b2cf1ded7951
Parents: 26f5c49
Author: Simon Elliston Ball <[email protected]>
Authored: Mon Aug 15 17:54:50 2016 +0100
Committer: Bryan Bende <[email protected]>
Committed: Tue Aug 16 12:50:53 2016 -0400

----------------------------------------------------------------------
 .../nifi/processors/kite/InferAvroSchema.java   | 22 ++---
 .../processors/kite/TestInferAvroSchema.java    | 91 +++++++++++++++++---
 2 files changed, 91 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b0122c6a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchema.java
 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchema.java
index 1923785..aad48ae 100644
--- 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchema.java
+++ 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchema.java
@@ -95,7 +95,7 @@ public class InferAvroSchema
                 .subject(subject)
                 .input(input)
                 .explanation("Only non-null single characters are supported")
-                .valid(input.length() == 1 && input.charAt(0) != 0)
+                .valid(input.length() == 1 && input.charAt(0) != 0 || 
context.isExpressionLanguagePresent(input))
                 .build();
         }
     };
@@ -175,6 +175,7 @@ public class InferAvroSchema
     public static final PropertyDescriptor DELIMITER = new 
PropertyDescriptor.Builder()
             .name("CSV delimiter")
             .description("Delimiter character for CSV records")
+            .expressionLanguageSupported(true)
             .addValidator(CHAR_VALIDATOR)
             .defaultValue(",")
             .build();
@@ -212,6 +213,7 @@ public class InferAvroSchema
             .description("Character encoding of CSV data.")
             .required(true)
             .defaultValue("UTF-8")
+            .expressionLanguageSupported(true)
             .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
             .build();
 
@@ -391,14 +393,14 @@ public class InferAvroSchema
         }
 
         //Prepares the CSVProperties for kite
-        final CSVProperties props = new CSVProperties.Builder()
-                .delimiter(context.getProperty(DELIMITER).getValue())
-                
.escape(context.getProperty(ESCAPE_STRING).evaluateAttributeExpressions().getValue())
-                
.quote(context.getProperty(QUOTE_STRING).evaluateAttributeExpressions().getValue())
+        CSVProperties props = new CSVProperties.Builder()
+                
.charset(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue())
+                
.delimiter(context.getProperty(DELIMITER).evaluateAttributeExpressions(inputFlowFile).getValue())
+                
.quote(context.getProperty(QUOTE_STRING).evaluateAttributeExpressions(inputFlowFile).getValue())
+                
.escape(context.getProperty(ESCAPE_STRING).evaluateAttributeExpressions(inputFlowFile).getValue())
+                
.linesToSkip(context.getProperty(HEADER_LINE_SKIP_COUNT).evaluateAttributeExpressions(inputFlowFile).asInteger())
                 .header(header.get())
                 .hasHeader(hasHeader.get())
-                
.linesToSkip(context.getProperty(HEADER_LINE_SKIP_COUNT).evaluateAttributeExpressions().asInteger())
-                .charset(context.getProperty(CHARSET).getValue())
                 .build();
 
         final AtomicReference<String> avroSchema = new AtomicReference<>();
@@ -408,7 +410,7 @@ public class InferAvroSchema
             public void process(InputStream in) throws IOException {
                 avroSchema.set(CSVUtil
                         .inferSchema(
-                                
context.getProperty(RECORD_NAME).evaluateAttributeExpressions().getValue(), in, 
props)
+                                
context.getProperty(RECORD_NAME).evaluateAttributeExpressions(inputFlowFile).getValue(),
 in, props)
                         
.toString(context.getProperty(PRETTY_AVRO_OUTPUT).asBoolean()));
             }
         });
@@ -435,8 +437,8 @@ public class InferAvroSchema
             @Override
             public void process(InputStream in) throws IOException {
                 Schema as = JsonUtil.inferSchema(
-                        in, 
context.getProperty(RECORD_NAME).evaluateAttributeExpressions().getValue(),
-                        
context.getProperty(NUM_RECORDS_TO_ANALYZE).evaluateAttributeExpressions().asInteger());
+                        in, 
context.getProperty(RECORD_NAME).evaluateAttributeExpressions(inputFlowFile).getValue(),
+                        
context.getProperty(NUM_RECORDS_TO_ANALYZE).evaluateAttributeExpressions(inputFlowFile).asInteger());
                 
avroSchema.set(as.toString(context.getProperty(PRETTY_AVRO_OUTPUT).asBoolean()));
 
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/b0122c6a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchema.java
 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchema.java
index 125a631..171a64a 100644
--- 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchema.java
+++ 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchema.java
@@ -49,7 +49,7 @@ public class TestInferAvroSchema {
     public void setup() {
         runner = TestRunners.newTestRunner(InferAvroSchema.class);
 
-        //Prepare the common setup.
+        // Prepare the common setup.
         runner.assertNotValid();
 
         runner.setProperty(InferAvroSchema.INPUT_CONTENT_TYPE, 
InferAvroSchema.USE_MIME_TYPE);
@@ -90,7 +90,7 @@ public class TestInferAvroSchema {
 
         runner.setProperty(InferAvroSchema.INPUT_CONTENT_TYPE, 
InferAvroSchema.USE_MIME_TYPE);
 
-        //Purposely set to True to test that none of the JSON file is read 
which would cause issues.
+        // Purposely set to True to test that none of the JSON file is read 
which would cause issues.
         
runner.setProperty(InferAvroSchema.GET_CSV_HEADER_DEFINITION_FROM_INPUT, 
"true");
         runner.setProperty(InferAvroSchema.SCHEMA_DESTINATION, 
InferAvroSchema.DESTINATION_ATTRIBUTE);
 
@@ -106,12 +106,10 @@ public class TestInferAvroSchema {
 
         MockFlowFile data = 
runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0);
         String avroSchema = 
data.getAttribute(InferAvroSchema.AVRO_SCHEMA_ATTRIBUTE_NAME);
-        String knownSchema = new String(unix2PlatformSpecificLineEndings(
-                    new File("src/test/resources/Shapes.json.avro")),
-                    StandardCharsets.UTF_8);
+        String knownSchema = new String(unix2PlatformSpecificLineEndings(new 
File("src/test/resources/Shapes.json.avro")), StandardCharsets.UTF_8);
         Assert.assertEquals(avroSchema, knownSchema);
 
-        //Since that avro schema is written to an attribute this should be teh 
same as the original
+        // Since that avro schema is written to an attribute this should be 
teh same as the original
         data.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"application/json");
     }
 
@@ -120,7 +118,7 @@ public class TestInferAvroSchema {
 
         runner.assertValid();
 
-        //Read in the header
+        // Read in the header
         StringWriter writer = new StringWriter();
         
IOUtils.copy((Files.newInputStream(Paths.get("src/test/resources/ShapesHeader.csv"),
 StandardOpenOption.READ)), writer, "UTF-8");
         runner.setProperty(InferAvroSchema.CSV_HEADER_DEFINITION, 
writer.toString());
@@ -168,7 +166,7 @@ public class TestInferAvroSchema {
     }
 
     @Test
-    public void inferSchemaFromEmptyContent() throws Exception  {
+    public void inferSchemaFromEmptyContent() throws Exception {
         runner.assertValid();
 
         Map<String, String> attributes = new HashMap<>();
@@ -223,13 +221,81 @@ public class TestInferAvroSchema {
         flowFile.assertContentEquals(new 
File("src/test/resources/Shapes_Header_TabDelimited.csv").toPath());
         flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"text/csv");
     }
-    static byte [] unix2PlatformSpecificLineEndings(final File file) throws 
IOException {
-        try ( final BufferedInputStream in = new BufferedInputStream(new 
FileInputStream(file));
-                final ByteArrayOutputStream out = new ByteArrayOutputStream()) 
{
+
+
+    @Test
+    public void specifyCSVparametersInExpressionLanguage() throws Exception {
+        runner.setProperty(InferAvroSchema.DELIMITER, "${csv.delimiter}");
+        runner.setProperty(InferAvroSchema.ESCAPE_STRING, "${csv.escape}");
+        runner.setProperty(InferAvroSchema.QUOTE_STRING, "${csv.quote}");
+        runner.setProperty(InferAvroSchema.CHARSET, "${csv.charset}");
+        
runner.setProperty(InferAvroSchema.GET_CSV_HEADER_DEFINITION_FROM_INPUT, 
"true");
+
+        runner.assertValid();
+
+        @SuppressWarnings("serial")
+        Map<String, String> attributes = new HashMap<String, String>() {
+            {
+                put("csv.delimiter",",");
+                put("csv.escape", "\\");
+                put("csv.quote", "\"");
+                put("csv.charset", "UTF-8");
+                put(CoreAttributes.MIME_TYPE.key(), "text/csv");
+            }
+        };
+
+        runner.enqueue(new 
File("src/test/resources/Shapes_Header.csv").toPath(), attributes);
+
+        runner.run();
+        runner.assertTransferCount(InferAvroSchema.REL_UNSUPPORTED_CONTENT, 0);
+        runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 0);
+        runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 1);
+        runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 1);
+
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0);
+        flowFile.assertContentEquals(unix2PlatformSpecificLineEndings(new 
File("src/test/resources/Shapes_header.csv.avro")));
+        flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"application/avro-binary");
+
+    }
+
+    @Test
+    public void specifyJsonParametersInExpressionLanguage() throws Exception {
+        runner.assertValid();
+        runner.setProperty(InferAvroSchema.INPUT_CONTENT_TYPE, 
InferAvroSchema.USE_MIME_TYPE);
+
+        // Purposely set to True to test that none of the JSON file is read 
which would cause issues.
+        
runner.setProperty(InferAvroSchema.GET_CSV_HEADER_DEFINITION_FROM_INPUT, 
"true");
+        runner.setProperty(InferAvroSchema.SCHEMA_DESTINATION, 
InferAvroSchema.DESTINATION_ATTRIBUTE);
+        runner.setProperty(InferAvroSchema.RECORD_NAME, "${record.name}");
+        runner.setProperty(InferAvroSchema.NUM_RECORDS_TO_ANALYZE, 
"${records.analyze}");
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+        attributes.put("record.name", "myrecord");
+        attributes.put("records.analyze", "2");
+        runner.enqueue(new File("src/test/resources/Shapes.json").toPath(), 
attributes);
+
+        runner.run();
+        runner.assertTransferCount(InferAvroSchema.REL_UNSUPPORTED_CONTENT, 0);
+        runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 0);
+        runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 1);
+        runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 1);
+
+        MockFlowFile data = 
runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0);
+        String avroSchema = 
data.getAttribute(InferAvroSchema.AVRO_SCHEMA_ATTRIBUTE_NAME);
+        Assert.assertTrue(avroSchema.contains("\"name\" : \"myrecord\""));
+
+        // Since that avro schema is written to an attribute this should be 
teh same as the original
+        data.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"application/json");
+    }
+
+
+    static byte[] unix2PlatformSpecificLineEndings(final File file) throws 
IOException {
+        try (final BufferedInputStream in = new BufferedInputStream(new 
FileInputStream(file)); final ByteArrayOutputStream out = new 
ByteArrayOutputStream()) {
             byte eol[] = 
System.lineSeparator().getBytes(StandardCharsets.UTF_8);
             int justRead;
             while ((justRead = in.read()) != -1) {
-                if (justRead == '\n'){
+                if (justRead == '\n') {
                     out.write(eol);
                 } else {
                     out.write(justRead);
@@ -238,4 +304,5 @@ public class TestInferAvroSchema {
             return out.toByteArray();
         }
     }
+
 }

Reply via email to