Repository: nifi Updated Branches: refs/heads/master cc9f89b00 -> 986a2a484
NIFI-5838 - Improve the schema validation method in Kite processors review Add empty check This closes #3182. Signed-off-by: Koji Kawamura <ijokaruma...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/986a2a48 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/986a2a48 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/986a2a48 Branch: refs/heads/master Commit: 986a2a484285a342e20494107abe52ff98ad2880 Parents: cc9f89b Author: Pierre Villard <pierre.villard...@gmail.com> Authored: Thu Nov 22 18:50:11 2018 +0100 Committer: Koji Kawamura <ijokaruma...@apache.org> Committed: Wed Dec 5 09:53:52 2018 +0900 ---------------------------------------------------------------------- .../processors/kite/AbstractKiteProcessor.java | 25 ++++++++++++-------- .../processors/kite/TestCSVToAvroProcessor.java | 18 ++++++++++++++ 2 files changed, 33 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/986a2a48/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java index 65dcd5f..345c1c2 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java @@ -39,6 +39,7 @@ import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.hadoop.HadoopValidators; +import org.apache.nifi.util.StringUtils; import org.kitesdk.data.DatasetNotFoundException; import org.kitesdk.data.Datasets; import org.kitesdk.data.SchemaNotFoundException; @@ -101,29 +102,30 @@ abstract class AbstractKiteProcessor extends AbstractProcessor { return parseSchema(uriOrLiteral); } + if(uri.getScheme() == null) { + throw new SchemaNotFoundException("If the schema is not a JSON string, a scheme must be specified in the URI " + + "(ex: dataset:, view:, resource:, file:, hdfs:, etc)."); + } + try { if ("dataset".equals(uri.getScheme()) || "view".equals(uri.getScheme())) { return Datasets.load(uri).getDataset().getDescriptor().getSchema(); } else if ("resource".equals(uri.getScheme())) { - try (InputStream in = Resources.getResource(uri.getSchemeSpecificPart()) - .openStream()) { + try (InputStream in = Resources.getResource(uri.getSchemeSpecificPart()).openStream()) { return parseSchema(uri, in); } } else { // try to open the file Path schemaPath = new Path(uri); - FileSystem fs = schemaPath.getFileSystem(conf); - try (InputStream in = fs.open(schemaPath)) { + try (FileSystem fs = schemaPath.getFileSystem(conf); InputStream in = fs.open(schemaPath)) { return parseSchema(uri, in); } } } catch (DatasetNotFoundException e) { - throw new SchemaNotFoundException( - "Cannot read schema of missing dataset: " + uri, e); + throw new SchemaNotFoundException("Cannot read schema of missing dataset: " + uri, e); } catch (IOException e) { - throw new SchemaNotFoundException( - "Failed while reading " + uri + ": " + e.getMessage(), e); + throw new SchemaNotFoundException("Failed while reading " + uri + ": " + e.getMessage(), e); } } @@ -131,8 +133,7 @@ abstract class AbstractKiteProcessor extends AbstractProcessor { try { return new Schema.Parser().parse(literal); } catch (RuntimeException e) { - throw new SchemaNotFoundException( - "Failed to parse schema: " + literal, e); + throw new SchemaNotFoundException("Failed to parse schema: " + literal, e); } } @@ -150,6 +151,10 @@ abstract class AbstractKiteProcessor extends AbstractProcessor { Configuration conf = getConfiguration(context.getProperty(CONF_XML_FILES).evaluateAttributeExpressions().getValue()); String error = null; + if(StringUtils.isBlank(uri)) { + return new ValidationResult.Builder().subject(subject).input(uri).explanation("Schema cannot be null.").valid(false).build(); + } + final boolean elPresent = context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(uri); if (!elPresent) { try { http://git-wip-us.apache.org/repos/asf/nifi/blob/986a2a48/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java index 8bad01c..50f5599 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java @@ -19,6 +19,7 @@ package org.apache.nifi.processors.kite; import java.io.ByteArrayInputStream; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; @@ -67,6 +68,23 @@ public class TestCSVToAvroProcessor { public static final String FAILURE_SUMMARY = "" + "Field id: cannot make \"long\" value: '': Field id type:LONG pos:0 not set and has no default value"; + + /** + * Test for a schema that is not a JSON but does not throw exception when trying to parse as an URI + */ + @Test + public void testSchemeValidation() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class); + runner.setProperty(ConvertCSVToAvro.SCHEMA, "column1;column2"); + runner.assertNotValid(); + runner.setProperty(ConvertCSVToAvro.SCHEMA, "src/test/resources/Shapes_header.csv.avro"); + runner.assertNotValid(); + runner.setProperty(ConvertCSVToAvro.SCHEMA, "file:" + new File("src/test/resources/Shapes_header.csv.avro").getAbsolutePath()); + runner.assertValid(); + runner.setProperty(ConvertCSVToAvro.SCHEMA, ""); + runner.assertNotValid(); + } + /** * Basic test for tab separated files, similar to #test */