This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 268ba1d  NIFI-7142: Automatically handle schema drift in the PutKudu 
processor
268ba1d is described below

commit 268ba1d23e34252ca9a6f61b7caacdc36a0be48d
Author: Grant Henke <[email protected]>
AuthorDate: Thu Feb 13 09:24:08 2020 -0600

    NIFI-7142: Automatically handle schema drift in the PutKudu processor
    
    Adds a boolean property to the PutKudu processor to optionally
    enable automatic schema drift handling.
    
    If set to true, when fields with names that are not in the target
    Kudu table are encountered, the Kudu table will be altered to
    include new columns for those fields.
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #4053.
---
 .../processors/kudu/AbstractKuduProcessor.java     | 30 +++++++++
 .../org/apache/nifi/processors/kudu/PutKudu.java   | 74 ++++++++++++++++++++--
 .../org/apache/nifi/processors/kudu/ITPutKudu.java | 29 +++++++--
 3 files changed, 121 insertions(+), 12 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
index 166d233..570b86d 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -45,6 +45,7 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.security.krb.KerberosAction;
 import org.apache.nifi.security.krb.KerberosKeytabUser;
 import org.apache.nifi.security.krb.KerberosUser;
+import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
 
 import javax.security.auth.login.LoginException;
@@ -231,6 +232,35 @@ public abstract class AbstractKuduProcessor extends 
AbstractProcessor {
         }
     }
 
+    /**
+     * Converts a NiFi DataType to it's equivalent Kudu Type.
+     */
+    protected Type toKuduType(DataType nifiType) {
+        switch (nifiType.getFieldType()) {
+            case BOOLEAN:
+                return Type.BOOL;
+            case BYTE:
+                return Type.INT8;
+            case SHORT:
+                return Type.INT16;
+            case INT:
+                return Type.INT32;
+            case LONG:
+                return Type.INT64;
+            case FLOAT:
+                return Type.FLOAT;
+            case DOUBLE:
+                return Type.DOUBLE;
+            case TIMESTAMP:
+                return Type.UNIXTIME_MICROS;
+            case CHAR:
+            case STRING:
+                return Type.STRING;
+            default:
+                throw new IllegalArgumentException(String.format("unsupported 
type %s", nifiType));
+        }
+    }
+
     private int getColumnIndex(Schema columns, String colName) {
         try {
             return columns.getColumnIndex(colName);
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index 0bb5882..aedac37 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -17,7 +17,10 @@
 
 package org.apache.nifi.processors.kudu;
 
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.AlterTableOptions;
 import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.KuduSession;
 import org.apache.kudu.client.Operation;
@@ -34,6 +37,7 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
@@ -46,6 +50,7 @@ import org.apache.nifi.security.krb.KerberosUser;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordSet;
 
 import javax.security.auth.login.LoginException;
@@ -59,6 +64,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 @EventDriven
 @SupportsBatching
@@ -108,6 +114,16 @@ public class PutKudu extends AbstractKuduProcessor {
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
 
+    protected static final PropertyDescriptor HANDLE_SCHEMA_DRIFT = new 
Builder()
+            .name("Handle Schema Drift")
+            .description("If set to true, when fields with names that are not 
in the target Kudu table " +
+                    "are encountered, the Kudu table will be altered to 
include new columns for those fields.")
+            .defaultValue("false")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
     protected static final PropertyDescriptor INSERT_OPERATION = new Builder()
         .name("Insert Operation")
         .displayName("Kudu Operation Type")
@@ -158,7 +174,6 @@ public class PutKudu extends AbstractKuduProcessor {
         .name("Ignore NULL")
         .description("Ignore NULL on Kudu Put Operation, Update only non-Null 
columns if set true")
         .defaultValue("false")
-        .allowableValues("true", "false")
         .required(true)
         .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
         
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@@ -188,6 +203,7 @@ public class PutKudu extends AbstractKuduProcessor {
         properties.add(KERBEROS_CREDENTIALS_SERVICE);
         properties.add(SKIP_HEAD_LINE);
         properties.add(LOWERCASE_FIELD_NAMES);
+        properties.add(HANDLE_SCHEMA_DRIFT);
         properties.add(RECORD_READER);
         properties.add(INSERT_OPERATION);
         properties.add(FLUSH_MODE);
@@ -251,16 +267,52 @@ public class PutKudu extends AbstractKuduProcessor {
         OperationType prevOperationType = OperationType.INSERT;
         final List<RowError> pendingRowErrors = new ArrayList<>();
         for (FlowFile flowFile : flowFiles) {
-            final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
-            final OperationType operationType = 
OperationType.valueOf(context.getProperty(INSERT_OPERATION).evaluateAttributeExpressions(flowFile).getValue());
-            final Boolean ignoreNull = 
Boolean.valueOf(context.getProperty(IGNORE_NULL).evaluateAttributeExpressions(flowFile).getValue());
-            final Boolean lowercaseFields = 
Boolean.valueOf(context.getProperty(LOWERCASE_FIELD_NAMES).evaluateAttributeExpressions(flowFile).getValue());
-
             try (final InputStream in = session.read(flowFile);
                 final RecordReader recordReader = 
recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
+
+                final String tableName = getEvaluatedProperty(TABLE_NAME, 
context, flowFile);
+                final OperationType operationType = 
OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, 
flowFile));
+                final Boolean ignoreNull = 
Boolean.valueOf(getEvaluatedProperty(IGNORE_NULL, context, flowFile));
+                final Boolean lowercaseFields = 
Boolean.valueOf(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, flowFile));
+                final Boolean handleSchemaDrift = 
Boolean.valueOf(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile));
+
                 final RecordSet recordSet = recordReader.createRecordSet();
                 final List<String> fieldNames = 
recordReader.getSchema().getFieldNames();
-                final KuduTable kuduTable = kuduClient.openTable(tableName);
+                KuduTable kuduTable = kuduClient.openTable(tableName);
+
+                // If handleSchemaDrift is true, check for any missing columns 
and alter the Kudu table to add them.
+                if (handleSchemaDrift) {
+                    final Schema schema = kuduTable.getSchema();
+                    Stream<RecordField> fields = 
recordReader.getSchema().getFields().stream();
+                    List<RecordField> missing = fields.filter(field -> 
!schema.hasColumn(
+                            lowercaseFields ? 
field.getFieldName().toLowerCase() : field.getFieldName()))
+                            .collect(Collectors.toList());
+                    if (!missing.isEmpty()) {
+                        getLogger().info("adding {} columns to table '{}' to 
handle schema drift",
+                                new Object[]{missing.size(), tableName});
+                        // Add each column one at a time to avoid failing if 
some of the missing columns
+                        // we created by a concurrent thread or application 
attempting to handle schema drift.
+                        for (RecordField field : missing) {
+                            try {
+                                String columnName = lowercaseFields ? 
field.getFieldName().toLowerCase() : field.getFieldName();
+                                AlterTableOptions alter = new 
AlterTableOptions();
+                                alter.addNullableColumn(columnName, 
toKuduType(field.getDataType()));
+                                kuduClient.alterTable(tableName, alter);
+                            } catch (KuduException e) {
+                                // Ignore the exception if the column already 
exists due to concurrent
+                                // threads or applications attempting to 
handle schema drift.
+                                if (e.getStatus().isAlreadyPresent()) {
+                                    getLogger().info("column already exists in 
table '{}' while handling schema drift",
+                                            new Object[]{tableName});
+                                } else {
+                                    throw new ProcessException(e);
+                                }
+                            }
+                        }
+                        // Re-open the table to get the new schema.
+                        kuduTable = kuduClient.openTable(tableName);
+                    }
+                }
 
                 // In the case of INSERT_IGNORE the Kudu session is modified 
to ignore row errors.
                 // Because the session is shared across flow files, for 
batching efficiency, we
@@ -348,6 +400,14 @@ public class PutKudu extends AbstractKuduProcessor {
         session.adjustCounter("Records Inserted", totalCount, false);
     }
 
+    private String getEvaluatedProperty(PropertyDescriptor property, 
ProcessContext context, FlowFile flowFile) {
+        PropertyValue evaluatedProperty = 
context.getProperty(property).evaluateAttributeExpressions(flowFile);
+        if (property.isRequired() && evaluatedProperty == null) {
+            throw new ProcessException(String.format("Property `%s` is 
required but evaluated to null", property.getDisplayName()));
+        }
+        return evaluatedProperty.getValue();
+    }
+
     protected KuduSession createKuduSession(final KuduClient client) {
         final KuduSession kuduSession = client.newSession();
         kuduSession.setMutationBufferSpace(batchSize);
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java
index 1de5c23..eee5e60 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java
@@ -99,9 +99,8 @@ public class ITPutKudu {
         KuduClient client =  harness.getClient();
         List<ColumnSchema> columns = new ArrayList<>();
         columns.add(new ColumnSchema.ColumnSchemaBuilder("id", 
Type.INT32).key(true).build());
-        columns.add(new ColumnSchema.ColumnSchemaBuilder("stringVal", 
Type.STRING).build());
-        columns.add(new ColumnSchema.ColumnSchemaBuilder("num32Val", 
Type.INT32).build());
-        columns.add(new ColumnSchema.ColumnSchemaBuilder("doubleVal", 
Type.DOUBLE).build());
+        columns.add(new ColumnSchema.ColumnSchemaBuilder("stringval", 
Type.STRING).build());
+        columns.add(new ColumnSchema.ColumnSchemaBuilder("num32val", 
Type.INT32).build());
         Schema schema = new Schema(columns);
         CreateTableOptions opts = new CreateTableOptions()
             .addHashPartitions(Collections.singletonList("id"), 4);
@@ -113,10 +112,12 @@ public class ITPutKudu {
         readerFactory.addSchemaField("id", RecordFieldType.INT);
         readerFactory.addSchemaField("stringVal", RecordFieldType.STRING);
         readerFactory.addSchemaField("num32Val", RecordFieldType.INT);
+        // Add two extra columns to test handleSchemaDrift = true.
         readerFactory.addSchemaField("doubleVal", RecordFieldType.DOUBLE);
+        readerFactory.addSchemaField("floatVal", RecordFieldType.FLOAT);
 
         for (int i = 0; i < numOfRecord; i++) {
-            readerFactory.addRecord(i, "val_" + i, 1000 + i, 100.88 + i);
+            readerFactory.addRecord(i, "val_" + i, 1000 + i, 100.88 + i, 
100.88 + i);
         }
 
         testRunner.addControllerService("mock-reader-factory", readerFactory);
@@ -139,6 +140,18 @@ public class ITPutKudu {
         testRunner.setProperty(PutKudu.BATCH_SIZE, "10");
         testRunner.setProperty(PutKudu.FLOWFILE_BATCH_SIZE, "2");
 
+        // Don't ignore null values.
+        flowFileAttributes.put("kudu.ignore.null", "false");
+        testRunner.setProperty(PutKudu.IGNORE_NULL, "${kudu.ignore.null}");
+
+        // Enable lowercase handling.
+        flowFileAttributes.put("kudu.lowercase.field.names", "true");
+        testRunner.setProperty(PutKudu.LOWERCASE_FIELD_NAMES, 
"${kudu.lowercase.field.names}");
+
+        // Enable schema drift handling.
+        flowFileAttributes.put("kudu.handle.schema.drift", "true");
+        testRunner.setProperty(PutKudu.HANDLE_SCHEMA_DRIFT, 
"${kudu.handle.schema.drift}");
+
         // Increase the thread count to better simulate a production 
environment
         testRunner.setThreadCount(4);
 
@@ -163,9 +176,15 @@ public class ITPutKudu {
         final ProvenanceEventRecord provEvent = provEvents.get(0);
         Assert.assertEquals(ProvenanceEventType.SEND, 
provEvent.getEventType());
 
-        // Verify Kudu record count.
         KuduClient client = harness.getClient();
         KuduTable kuduTable = client.openTable(DEFAULT_TABLE_NAME);
+
+        // Verify the extra field was added.
+        Assert.assertEquals(5, kuduTable.getSchema().getColumnCount());
+        Assert.assertTrue(kuduTable.getSchema().hasColumn("doubleval"));
+        Assert.assertTrue(kuduTable.getSchema().hasColumn("floatval"));
+
+        // Verify Kudu record count.
         KuduScanner scanner = client.newScannerBuilder(kuduTable).build();
         int count = 0;
         for (RowResult unused : scanner) {

Reply via email to