This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 09c7406 NIFI-6867: Validate PutKudu operation type property
09c7406 is described below
commit 09c7406d1835dc9828e6aa798db934830fb1f548
Author: Grant Henke <[email protected]>
AuthorDate: Fri Feb 14 10:00:00 2020 -0600
NIFI-6867: Validate PutKudu operation type property
This patch adds validation to the PutKudu operation type property.
It also improves the description to include the valid values and
adjusts the inputs to be case insensitive.
Signed-off-by: Pierre Villard <[email protected]>
This closes #4063.
---
.../org/apache/nifi/processors/kudu/PutKudu.java | 38 +++++++++++++++++++---
.../org/apache/nifi/processors/kudu/ITPutKudu.java | 5 ++-
2 files changed, 38 insertions(+), 5 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index aedac37..3769932 100644
---
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -38,6 +38,9 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
@@ -58,6 +61,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -124,13 +128,39 @@ public class PutKudu extends AbstractKuduProcessor {
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
+ protected static final Validator OperationTypeValidator = new Validator() {
+ @Override
+ public ValidationResult validate(String subject, String value,
ValidationContext context) {
+ if (context.isExpressionLanguageSupported(subject) &&
context.isExpressionLanguagePresent(value)) {
+ return new
ValidationResult.Builder().subject(subject).input(value)
+ .explanation("Expression Language
Present").valid(true).build();
+ }
+
+ boolean valid;
+ try {
+ OperationType.valueOf(value.toUpperCase());
+ valid = true;
+ } catch (IllegalArgumentException ex) {
+ valid = false;
+ }
+
+ final String explanation = valid ? null :
+ "Value must be one of: " +
+
Arrays.stream(OperationType.values()).map(Enum::toString).collect(Collectors.joining(",
"));
+ return new
ValidationResult.Builder().subject(subject).input(value).valid(valid)
+ .explanation(explanation).build();
+ }
+ };
+
protected static final PropertyDescriptor INSERT_OPERATION = new Builder()
.name("Insert Operation")
.displayName("Kudu Operation Type")
- .description("Specify operationType for this processor. Insert-Ignore
will ignore duplicated rows")
+ .description("Specify operationType for this processor.\n" +
+ "Valid values are: " +
+
Arrays.stream(OperationType.values()).map(Enum::toString).collect(Collectors.joining(",
")))
.defaultValue(OperationType.INSERT.toString())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .addValidator(OperationTypeValidator)
.build();
protected static final PropertyDescriptor FLUSH_MODE = new Builder()
@@ -227,7 +257,7 @@ public class PutKudu extends AbstractKuduProcessor {
public void onScheduled(final ProcessContext context) throws IOException,
LoginException {
batchSize =
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
ffbatch =
context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
- flushMode =
SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue());
+ flushMode =
SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue().toUpperCase());
createKuduClient(context);
}
@@ -271,7 +301,7 @@ public class PutKudu extends AbstractKuduProcessor {
final RecordReader recordReader =
recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
final String tableName = getEvaluatedProperty(TABLE_NAME,
context, flowFile);
- final OperationType operationType =
OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context,
flowFile));
+ final OperationType operationType =
OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context,
flowFile).toUpperCase());
final Boolean ignoreNull =
Boolean.valueOf(getEvaluatedProperty(IGNORE_NULL, context, flowFile));
final Boolean lowercaseFields =
Boolean.valueOf(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, flowFile));
final Boolean handleSchemaDrift =
Boolean.valueOf(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile));
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java
index eee5e60..b3ad170 100644
---
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java
@@ -136,10 +136,13 @@ public class ITPutKudu {
flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
// Use values to ensure multiple batches and multiple flow files
per-trigger
- testRunner.setProperty(PutKudu.INSERT_OPERATION,
OperationType.UPSERT.toString());
testRunner.setProperty(PutKudu.BATCH_SIZE, "10");
testRunner.setProperty(PutKudu.FLOWFILE_BATCH_SIZE, "2");
+ // Set the operation type.
+ flowFileAttributes.put("kudu.operation.type", "upsert");
+ testRunner.setProperty(PutKudu.INSERT_OPERATION,
"${kudu.operation.type}");
+
// Don't ignore null values.
flowFileAttributes.put("kudu.ignore.null", "false");
testRunner.setProperty(PutKudu.IGNORE_NULL, "${kudu.ignore.null}");