This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.11.x in repository https://gitbox.apache.org/repos/asf/nifi.git
commit e4b905b1bc526aed88cd30cd8d934d0d24d5226a Author: Grant Henke <[email protected]> AuthorDate: Fri Feb 14 10:00:00 2020 -0600 NIFI-6867: Validate PutKudu operation type property This patch adds validation to the PutKudu operation type property. It also improves the description to include the valid values and adjusts the inputs to be case insensitive. Signed-off-by: Pierre Villard <[email protected]> This closes #4063. --- .../org/apache/nifi/processors/kudu/PutKudu.java | 38 +++++++++++++++++++--- .../org/apache/nifi/processors/kudu/ITPutKudu.java | 5 ++- 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java index aedac37..3769932 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java @@ -38,6 +38,9 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor.Builder; import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; @@ -58,6 +61,7 @@ import java.io.IOException; import java.io.InputStream; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -124,13 +128,39 @@ public class PutKudu extends AbstractKuduProcessor { .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .build(); + protected static final Validator OperationTypeValidator = new Validator() { + @Override + public ValidationResult validate(String subject, String value, ValidationContext context) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { + return new ValidationResult.Builder().subject(subject).input(value) + .explanation("Expression Language Present").valid(true).build(); + } + + boolean valid; + try { + OperationType.valueOf(value.toUpperCase()); + valid = true; + } catch (IllegalArgumentException ex) { + valid = false; + } + + final String explanation = valid ? null : + "Value must be one of: " + + Arrays.stream(OperationType.values()).map(Enum::toString).collect(Collectors.joining(", ")); + return new ValidationResult.Builder().subject(subject).input(value).valid(valid) + .explanation(explanation).build(); + } + }; + protected static final PropertyDescriptor INSERT_OPERATION = new Builder() .name("Insert Operation") .displayName("Kudu Operation Type") - .description("Specify operationType for this processor. Insert-Ignore will ignore duplicated rows") + .description("Specify operationType for this processor.\n" + + "Valid values are: " + + Arrays.stream(OperationType.values()).map(Enum::toString).collect(Collectors.joining(", "))) .defaultValue(OperationType.INSERT.toString()) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(OperationTypeValidator) .build(); protected static final PropertyDescriptor FLUSH_MODE = new Builder() @@ -227,7 +257,7 @@ public class PutKudu extends AbstractKuduProcessor { public void onScheduled(final ProcessContext context) throws IOException, LoginException { batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger(); ffbatch = context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger(); - flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue()); + flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue().toUpperCase()); createKuduClient(context); } @@ -271,7 +301,7 @@ public class PutKudu extends AbstractKuduProcessor { final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) { final String tableName = getEvaluatedProperty(TABLE_NAME, context, flowFile); - final OperationType operationType = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, flowFile)); + final OperationType operationType = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, flowFile).toUpperCase()); final Boolean ignoreNull = Boolean.valueOf(getEvaluatedProperty(IGNORE_NULL, context, flowFile)); final Boolean lowercaseFields = Boolean.valueOf(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, flowFile)); final Boolean handleSchemaDrift = Boolean.valueOf(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile)); diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java index eee5e60..b3ad170 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java @@ -136,10 +136,13 @@ public class ITPutKudu { flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); // Use values to ensure multiple batches and multiple flow files per-trigger - testRunner.setProperty(PutKudu.INSERT_OPERATION, OperationType.UPSERT.toString()); testRunner.setProperty(PutKudu.BATCH_SIZE, "10"); testRunner.setProperty(PutKudu.FLOWFILE_BATCH_SIZE, "2"); + // Set the operation type. + flowFileAttributes.put("kudu.operation.type", "upsert"); + testRunner.setProperty(PutKudu.INSERT_OPERATION, "${kudu.operation.type}"); + // Don't ignore null values. flowFileAttributes.put("kudu.ignore.null", "false"); testRunner.setProperty(PutKudu.IGNORE_NULL, "${kudu.ignore.null}");
