This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4098404  NIFI-6551: Improve PutKudu timestamp handling
4098404 is described below

commit 4098404596ab9994cb9cb3ca57a58a7b2f593d70
Author: Grant Henke <[email protected]>
AuthorDate: Thu Feb 13 17:03:20 2020 -0600

    NIFI-6551: Improve PutKudu timestamp handling
    
    Uses `DataTypeUtils.toTimestamp` when writing to Kudu
    timestamp (`UNIXTIME_MICROS`) columns. This allows
    us to use the `row.addTimestamp` API and get much more
    intuitive and predictable timestamp ingest behavior.
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #4054.
---
 .../processors/kudu/AbstractKuduProcessor.java     | 51 ++++++++++++----------
 .../org/apache/nifi/processors/kudu/ITPutKudu.java | 12 +++--
 .../apache/nifi/processors/kudu/TestPutKudu.java   |  5 ++-
 3 files changed, 40 insertions(+), 28 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
index 570b86d..9636cf6 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -47,9 +47,11 @@ import org.apache.nifi.security.krb.KerberosKeytabUser;
 import org.apache.nifi.security.krb.KerberosUser;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
 
 import javax.security.auth.login.LoginException;
 import java.math.BigDecimal;
+import java.sql.Timestamp;
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 import java.util.List;
@@ -182,47 +184,50 @@ public abstract class AbstractKuduProcessor extends 
AbstractProcessor {
                     if (schema.getColumnByIndex(colIdx).isKey()) {
                         throw new 
IllegalArgumentException(String.format("Can't set primary key column %s to null 
", colName));
                     } else if(!schema.getColumnByIndex(colIdx).isNullable()) {
-                        throw new 
IllegalArgumentException(String.format("Can't set primary key column %s to null 
", colName));
+                        throw new 
IllegalArgumentException(String.format("Can't set column %s to null ", 
colName));
                     }
 
                     if (!ignoreNull) {
                         row.setNull(colName);
-                        continue;
                     }
                 } else {
-                    switch 
(colType.getDataType(colSchema.getTypeAttributes())) {
+                    Object value = record.getValue(recordFieldName);
+                    switch (colType) {
                         case BOOL:
-                            row.addBoolean(colIdx, 
record.getAsBoolean(recordFieldName));
-                            break;
-                        case FLOAT:
-                            row.addFloat(colIdx, 
record.getAsFloat(recordFieldName));
-                            break;
-                        case DOUBLE:
-                            row.addDouble(colIdx, 
record.getAsDouble(recordFieldName));
-                            break;
-                        case BINARY:
-                            row.addBinary(colIdx, 
record.getAsString(recordFieldName).getBytes());
+                            row.addBoolean(colIdx, 
DataTypeUtils.toBoolean(value, recordFieldName));
                             break;
                         case INT8:
-                            row.addByte(colIdx, 
record.getAsInt(recordFieldName).byteValue());
+                            row.addByte(colIdx, DataTypeUtils.toByte(value, 
recordFieldName));
                             break;
                         case INT16:
-                            row.addShort(colIdx, 
record.getAsInt(recordFieldName).shortValue());
+                            row.addShort(colIdx,  DataTypeUtils.toShort(value, 
recordFieldName));
                             break;
                         case INT32:
-                            row.addInt(colIdx, 
record.getAsInt(recordFieldName));
+                            row.addInt(colIdx,  DataTypeUtils.toInteger(value, 
recordFieldName));
                             break;
                         case INT64:
+                            row.addLong(colIdx,  DataTypeUtils.toLong(value, 
recordFieldName));
+                            break;
                         case UNIXTIME_MICROS:
-                            row.addLong(colIdx, 
record.getAsLong(recordFieldName));
+                            DataType fieldType = 
record.getSchema().getDataType(recordFieldName).get();
+                            Timestamp timestamp = 
DataTypeUtils.toTimestamp(record.getValue(recordFieldName),
+                                    () -> 
DataTypeUtils.getDateFormat(fieldType.getFormat()), recordFieldName);
+                            row.addTimestamp(colIdx, timestamp);
                             break;
                         case STRING:
-                            row.addString(colIdx, 
record.getAsString(recordFieldName));
+                            row.addString(colIdx, 
DataTypeUtils.toString(value, recordFieldName));
+                            break;
+                        case BINARY:
+                            row.addBinary(colIdx, 
DataTypeUtils.toString(value, recordFieldName).getBytes());
+                            break;
+                        case FLOAT:
+                            row.addFloat(colIdx, DataTypeUtils.toFloat(value, 
recordFieldName));
+                            break;
+                        case DOUBLE:
+                            row.addDouble(colIdx, 
DataTypeUtils.toDouble(value, recordFieldName));
                             break;
-                        case DECIMAL32:
-                        case DECIMAL64:
-                        case DECIMAL128:
-                            row.addDecimal(colIdx, new 
BigDecimal(record.getAsString(recordFieldName)));
+                        case DECIMAL:
+                            row.addDecimal(colIdx, new 
BigDecimal(DataTypeUtils.toString(value, recordFieldName)));
                             break;
                         default:
                             throw new 
IllegalStateException(String.format("unknown column type %s", colType));
@@ -293,4 +298,4 @@ public abstract class AbstractKuduProcessor extends 
AbstractProcessor {
         return update;
     }
 
-}
\ No newline at end of file
+}
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java
index b3ad170..1be21cb 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java
@@ -43,6 +43,7 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -54,6 +55,8 @@ public class ITPutKudu {
 
     public static final String DEFAULT_TABLE_NAME = "Nifi-Kudu-Table";
 
+    public static final Timestamp NOW = new 
Timestamp(System.currentTimeMillis());
+
     // The KuduTestHarness automatically starts and stops a real Kudu cluster
     // when each test is run. Kudu persists its on-disk state in a temporary
     // directory under a location defined by the environment variable 
TEST_TMPDIR
@@ -101,6 +104,7 @@ public class ITPutKudu {
         columns.add(new ColumnSchema.ColumnSchemaBuilder("id", 
Type.INT32).key(true).build());
         columns.add(new ColumnSchema.ColumnSchemaBuilder("stringval", 
Type.STRING).build());
         columns.add(new ColumnSchema.ColumnSchemaBuilder("num32val", 
Type.INT32).build());
+        columns.add(new ColumnSchema.ColumnSchemaBuilder("timestampval", 
Type.UNIXTIME_MICROS).build());
         Schema schema = new Schema(columns);
         CreateTableOptions opts = new CreateTableOptions()
             .addHashPartitions(Collections.singletonList("id"), 4);
@@ -112,12 +116,13 @@ public class ITPutKudu {
         readerFactory.addSchemaField("id", RecordFieldType.INT);
         readerFactory.addSchemaField("stringVal", RecordFieldType.STRING);
         readerFactory.addSchemaField("num32Val", RecordFieldType.INT);
+        readerFactory.addSchemaField("timestampVal", 
RecordFieldType.TIMESTAMP);
         // Add two extra columns to test handleSchemaDrift = true.
         readerFactory.addSchemaField("doubleVal", RecordFieldType.DOUBLE);
         readerFactory.addSchemaField("floatVal", RecordFieldType.FLOAT);
 
         for (int i = 0; i < numOfRecord; i++) {
-            readerFactory.addRecord(i, "val_" + i, 1000 + i, 100.88 + i, 
100.88 + i);
+            readerFactory.addRecord(i, "val_" + i, 1000 + i, NOW, 100.88 + i, 
100.88 + i);
         }
 
         testRunner.addControllerService("mock-reader-factory", readerFactory);
@@ -183,14 +188,15 @@ public class ITPutKudu {
         KuduTable kuduTable = client.openTable(DEFAULT_TABLE_NAME);
 
         // Verify the extra field was added.
-        Assert.assertEquals(5, kuduTable.getSchema().getColumnCount());
+        Assert.assertEquals(6, kuduTable.getSchema().getColumnCount());
         Assert.assertTrue(kuduTable.getSchema().hasColumn("doubleval"));
         Assert.assertTrue(kuduTable.getSchema().hasColumn("floatval"));
 
         // Verify Kudu record count.
         KuduScanner scanner = client.newScannerBuilder(kuduTable).build();
         int count = 0;
-        for (RowResult unused : scanner) {
+        for (RowResult row : scanner) {
+            Assert.assertEquals(NOW, row.getTimestamp("timestampval"));
             count++;
         }
         Assert.assertEquals(recordCount, count);
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
index 9be6952..b55695f 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
@@ -60,6 +60,7 @@ import org.mockito.stubbing.OngoingStubbing;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -424,7 +425,7 @@ public class TestPutKudu {
             new RecordField(recordIdName, 
RecordFieldType.BIGINT.getDataType()),
             new RecordField("name", RecordFieldType.STRING.getDataType()),
             new RecordField("age", RecordFieldType.SHORT.getDataType()),
-            new RecordField("updated_at", 
RecordFieldType.BIGINT.getDataType()),
+            new RecordField("updated_at", 
RecordFieldType.TIMESTAMP.getDataType()),
             new RecordField("score", RecordFieldType.LONG.getDataType())));
 
         Map<String, Object> values = new HashMap<>();
@@ -432,7 +433,7 @@ public class TestPutKudu {
         values.put(recordIdName, id);
         values.put("name", name);
         values.put("age", age);
-        values.put("updated_at", System.currentTimeMillis() * 1000);
+        values.put("updated_at", new Timestamp(System.currentTimeMillis()));
         values.put("score", 10000L);
         processor.buildPartialRow(
             kuduSchema,

Reply via email to