This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 268ba1d NIFI-7142: Automatically handle schema drift in the PutKudu
processor
268ba1d is described below
commit 268ba1d23e34252ca9a6f61b7caacdc36a0be48d
Author: Grant Henke <[email protected]>
AuthorDate: Thu Feb 13 09:24:08 2020 -0600
NIFI-7142: Automatically handle schema drift in the PutKudu processor
Adds a boolean property to the PutKudu processor to optionally
enable automatic schema drift handling.
If set to true, when fields with names that are not in the target
Kudu table are encountered, the Kudu table will be altered to
include new columns for those fields.
Signed-off-by: Pierre Villard <[email protected]>
This closes #4053.
---
.../processors/kudu/AbstractKuduProcessor.java | 30 +++++++++
.../org/apache/nifi/processors/kudu/PutKudu.java | 74 ++++++++++++++++++++--
.../org/apache/nifi/processors/kudu/ITPutKudu.java | 29 +++++++--
3 files changed, 121 insertions(+), 12 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
index 166d233..570b86d 100644
---
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -45,6 +45,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.krb.KerberosAction;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosUser;
+import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import javax.security.auth.login.LoginException;
@@ -231,6 +232,35 @@ public abstract class AbstractKuduProcessor extends
AbstractProcessor {
}
}
+ /**
+ * Converts a NiFi DataType to it's equivalent Kudu Type.
+ */
+ protected Type toKuduType(DataType nifiType) {
+ switch (nifiType.getFieldType()) {
+ case BOOLEAN:
+ return Type.BOOL;
+ case BYTE:
+ return Type.INT8;
+ case SHORT:
+ return Type.INT16;
+ case INT:
+ return Type.INT32;
+ case LONG:
+ return Type.INT64;
+ case FLOAT:
+ return Type.FLOAT;
+ case DOUBLE:
+ return Type.DOUBLE;
+ case TIMESTAMP:
+ return Type.UNIXTIME_MICROS;
+ case CHAR:
+ case STRING:
+ return Type.STRING;
+ default:
+ throw new IllegalArgumentException(String.format("unsupported
type %s", nifiType));
+ }
+ }
+
private int getColumnIndex(Schema columns, String colName) {
try {
return columns.getColumnIndex(colName);
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index 0bb5882..aedac37 100644
---
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -17,7 +17,10 @@
package org.apache.nifi.processors.kudu;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.AlterTableOptions;
import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.Operation;
@@ -34,6 +37,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
@@ -46,6 +50,7 @@ import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSet;
import javax.security.auth.login.LoginException;
@@ -59,6 +64,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
@EventDriven
@SupportsBatching
@@ -108,6 +114,16 @@ public class PutKudu extends AbstractKuduProcessor {
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
+ protected static final PropertyDescriptor HANDLE_SCHEMA_DRIFT = new
Builder()
+ .name("Handle Schema Drift")
+ .description("If set to true, when fields with names that are not
in the target Kudu table " +
+ "are encountered, the Kudu table will be altered to
include new columns for those fields.")
+ .defaultValue("false")
+ .required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
protected static final PropertyDescriptor INSERT_OPERATION = new Builder()
.name("Insert Operation")
.displayName("Kudu Operation Type")
@@ -158,7 +174,6 @@ public class PutKudu extends AbstractKuduProcessor {
.name("Ignore NULL")
.description("Ignore NULL on Kudu Put Operation, Update only non-Null
columns if set true")
.defaultValue("false")
- .allowableValues("true", "false")
.required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@@ -188,6 +203,7 @@ public class PutKudu extends AbstractKuduProcessor {
properties.add(KERBEROS_CREDENTIALS_SERVICE);
properties.add(SKIP_HEAD_LINE);
properties.add(LOWERCASE_FIELD_NAMES);
+ properties.add(HANDLE_SCHEMA_DRIFT);
properties.add(RECORD_READER);
properties.add(INSERT_OPERATION);
properties.add(FLUSH_MODE);
@@ -251,16 +267,52 @@ public class PutKudu extends AbstractKuduProcessor {
OperationType prevOperationType = OperationType.INSERT;
final List<RowError> pendingRowErrors = new ArrayList<>();
for (FlowFile flowFile : flowFiles) {
- final String tableName =
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
- final OperationType operationType =
OperationType.valueOf(context.getProperty(INSERT_OPERATION).evaluateAttributeExpressions(flowFile).getValue());
- final Boolean ignoreNull =
Boolean.valueOf(context.getProperty(IGNORE_NULL).evaluateAttributeExpressions(flowFile).getValue());
- final Boolean lowercaseFields =
Boolean.valueOf(context.getProperty(LOWERCASE_FIELD_NAMES).evaluateAttributeExpressions(flowFile).getValue());
-
try (final InputStream in = session.read(flowFile);
final RecordReader recordReader =
recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
+
+ final String tableName = getEvaluatedProperty(TABLE_NAME,
context, flowFile);
+ final OperationType operationType =
OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context,
flowFile));
+ final Boolean ignoreNull =
Boolean.valueOf(getEvaluatedProperty(IGNORE_NULL, context, flowFile));
+ final Boolean lowercaseFields =
Boolean.valueOf(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, flowFile));
+ final Boolean handleSchemaDrift =
Boolean.valueOf(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile));
+
final RecordSet recordSet = recordReader.createRecordSet();
final List<String> fieldNames =
recordReader.getSchema().getFieldNames();
- final KuduTable kuduTable = kuduClient.openTable(tableName);
+ KuduTable kuduTable = kuduClient.openTable(tableName);
+
+ // If handleSchemaDrift is true, check for any missing columns
and alter the Kudu table to add them.
+ if (handleSchemaDrift) {
+ final Schema schema = kuduTable.getSchema();
+ Stream<RecordField> fields =
recordReader.getSchema().getFields().stream();
+ List<RecordField> missing = fields.filter(field ->
!schema.hasColumn(
+ lowercaseFields ?
field.getFieldName().toLowerCase() : field.getFieldName()))
+ .collect(Collectors.toList());
+ if (!missing.isEmpty()) {
+ getLogger().info("adding {} columns to table '{}' to
handle schema drift",
+ new Object[]{missing.size(), tableName});
+ // Add each column one at a time to avoid failing if
some of the missing columns
+ // we created by a concurrent thread or application
attempting to handle schema drift.
+ for (RecordField field : missing) {
+ try {
+ String columnName = lowercaseFields ?
field.getFieldName().toLowerCase() : field.getFieldName();
+ AlterTableOptions alter = new
AlterTableOptions();
+ alter.addNullableColumn(columnName,
toKuduType(field.getDataType()));
+ kuduClient.alterTable(tableName, alter);
+ } catch (KuduException e) {
+ // Ignore the exception if the column already
exists due to concurrent
+ // threads or applications attempting to
handle schema drift.
+ if (e.getStatus().isAlreadyPresent()) {
+ getLogger().info("column already exists in
table '{}' while handling schema drift",
+ new Object[]{tableName});
+ } else {
+ throw new ProcessException(e);
+ }
+ }
+ }
+ // Re-open the table to get the new schema.
+ kuduTable = kuduClient.openTable(tableName);
+ }
+ }
// In the case of INSERT_IGNORE the Kudu session is modified
to ignore row errors.
// Because the session is shared across flow files, for
batching efficiency, we
@@ -348,6 +400,14 @@ public class PutKudu extends AbstractKuduProcessor {
session.adjustCounter("Records Inserted", totalCount, false);
}
+ private String getEvaluatedProperty(PropertyDescriptor property,
ProcessContext context, FlowFile flowFile) {
+ PropertyValue evaluatedProperty =
context.getProperty(property).evaluateAttributeExpressions(flowFile);
+ if (property.isRequired() && evaluatedProperty == null) {
+ throw new ProcessException(String.format("Property `%s` is
required but evaluated to null", property.getDisplayName()));
+ }
+ return evaluatedProperty.getValue();
+ }
+
protected KuduSession createKuduSession(final KuduClient client) {
final KuduSession kuduSession = client.newSession();
kuduSession.setMutationBufferSpace(batchSize);
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java
index 1de5c23..eee5e60 100644
---
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java
@@ -99,9 +99,8 @@ public class ITPutKudu {
KuduClient client = harness.getClient();
List<ColumnSchema> columns = new ArrayList<>();
columns.add(new ColumnSchema.ColumnSchemaBuilder("id",
Type.INT32).key(true).build());
- columns.add(new ColumnSchema.ColumnSchemaBuilder("stringVal",
Type.STRING).build());
- columns.add(new ColumnSchema.ColumnSchemaBuilder("num32Val",
Type.INT32).build());
- columns.add(new ColumnSchema.ColumnSchemaBuilder("doubleVal",
Type.DOUBLE).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("stringval",
Type.STRING).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("num32val",
Type.INT32).build());
Schema schema = new Schema(columns);
CreateTableOptions opts = new CreateTableOptions()
.addHashPartitions(Collections.singletonList("id"), 4);
@@ -113,10 +112,12 @@ public class ITPutKudu {
readerFactory.addSchemaField("id", RecordFieldType.INT);
readerFactory.addSchemaField("stringVal", RecordFieldType.STRING);
readerFactory.addSchemaField("num32Val", RecordFieldType.INT);
+ // Add two extra columns to test handleSchemaDrift = true.
readerFactory.addSchemaField("doubleVal", RecordFieldType.DOUBLE);
+ readerFactory.addSchemaField("floatVal", RecordFieldType.FLOAT);
for (int i = 0; i < numOfRecord; i++) {
- readerFactory.addRecord(i, "val_" + i, 1000 + i, 100.88 + i);
+ readerFactory.addRecord(i, "val_" + i, 1000 + i, 100.88 + i,
100.88 + i);
}
testRunner.addControllerService("mock-reader-factory", readerFactory);
@@ -139,6 +140,18 @@ public class ITPutKudu {
testRunner.setProperty(PutKudu.BATCH_SIZE, "10");
testRunner.setProperty(PutKudu.FLOWFILE_BATCH_SIZE, "2");
+ // Don't ignore null values.
+ flowFileAttributes.put("kudu.ignore.null", "false");
+ testRunner.setProperty(PutKudu.IGNORE_NULL, "${kudu.ignore.null}");
+
+ // Enable lowercase handling.
+ flowFileAttributes.put("kudu.lowercase.field.names", "true");
+ testRunner.setProperty(PutKudu.LOWERCASE_FIELD_NAMES,
"${kudu.lowercase.field.names}");
+
+ // Enable schema drift handling.
+ flowFileAttributes.put("kudu.handle.schema.drift", "true");
+ testRunner.setProperty(PutKudu.HANDLE_SCHEMA_DRIFT,
"${kudu.handle.schema.drift}");
+
// Increase the thread count to better simulate a production
environment
testRunner.setThreadCount(4);
@@ -163,9 +176,15 @@ public class ITPutKudu {
final ProvenanceEventRecord provEvent = provEvents.get(0);
Assert.assertEquals(ProvenanceEventType.SEND,
provEvent.getEventType());
- // Verify Kudu record count.
KuduClient client = harness.getClient();
KuduTable kuduTable = client.openTable(DEFAULT_TABLE_NAME);
+
+ // Verify the extra field was added.
+ Assert.assertEquals(5, kuduTable.getSchema().getColumnCount());
+ Assert.assertTrue(kuduTable.getSchema().hasColumn("doubleval"));
+ Assert.assertTrue(kuduTable.getSchema().hasColumn("floatval"));
+
+ // Verify Kudu record count.
KuduScanner scanner = client.newScannerBuilder(kuduTable).build();
int count = 0;
for (RowResult unused : scanner) {