Repository: nifi Updated Branches: refs/heads/master 9a1ab4c50 -> c51512f5e
NIFI-5891 fix handling of null logical types in Hive3Streaming processor NIFI-5891: Fixed Checkstyle issues Signed-off-by: Matthew Burgess <mattyb...@apache.org> This closes #3216 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c51512f5 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c51512f5 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c51512f5 Branch: refs/heads/master Commit: c51512f5e33cbd413b1fda8700408aa95614680e Parents: 9a1ab4c Author: gkkorir <gkko...@safaricom.co.ke> Authored: Thu Dec 13 17:25:37 2018 +0300 Committer: Matthew Burgess <mattyb...@apache.org> Committed: Thu Dec 13 10:23:18 2018 -0500 ---------------------------------------------------------------------- .../apache/hive/streaming/NiFiRecordSerDe.java | 28 ++-- .../processors/hive/TestPutHive3Streaming.java | 140 +++++++++++++++++++ 2 files changed, 159 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/c51512f5/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java index 932772e..e628474 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java @@ -223,27 +223,37 @@ public class NiFiRecordSerDe extends AbstractSerDe { break; case DATE: Date d = record.getAsDate(fieldName, field.getDataType().getFormat()); - org.apache.hadoop.hive.common.type.Date hiveDate = new org.apache.hadoop.hive.common.type.Date(); - hiveDate.setTimeInMillis(d.getTime()); - val = hiveDate; + if(d != null) { + org.apache.hadoop.hive.common.type.Date hiveDate = new org.apache.hadoop.hive.common.type.Date(); + hiveDate.setTimeInMillis(d.getTime()); + val = hiveDate; + } else { + val = null; + } break; // ORC doesn't currently handle TIMESTAMPLOCALTZ case TIMESTAMP: Timestamp ts = DataTypeUtils.toTimestamp(record.getValue(fieldName), () -> DataTypeUtils.getDateFormat(field.getDataType().getFormat()), fieldName); - // Convert to Hive's Timestamp type - org.apache.hadoop.hive.common.type.Timestamp hivetimestamp = new org.apache.hadoop.hive.common.type.Timestamp(); - hivetimestamp.setTimeInMillis(ts.getTime(), ts.getNanos()); - val = hivetimestamp; + if(ts != null) { + // Convert to Hive's Timestamp type + org.apache.hadoop.hive.common.type.Timestamp hivetimestamp = new org.apache.hadoop.hive.common.type.Timestamp(); + hivetimestamp.setTimeInMillis(ts.getTime(), ts.getNanos()); + val = hivetimestamp; + } else { + val = null; + } break; case DECIMAL: - val = HiveDecimal.create(record.getAsDouble(fieldName)); + Double value = record.getAsDouble(fieldName); + val = value == null ? null : HiveDecimal.create(value); break; default: throw new IllegalArgumentException("Field " + fieldName + " cannot be converted to type: " + primitiveCategory.name()); } break; case LIST: - val = Arrays.asList(record.getAsArray(fieldName)); + Object[] value = record.getAsArray(fieldName); + val = value == null ? null : Arrays.asList(value); break; case MAP: val = record.getValue(fieldName); http://git-wip-us.apache.org/repos/asf/nifi/blob/c51512f5/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java index 5fd759f..ee05416 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java @@ -56,6 +56,7 @@ import org.apache.nifi.hadoop.SecurityUtil; import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.MockRecordParser; @@ -752,6 +753,145 @@ public class TestPutHive3Streaming { assertEquals("default.users", flowFile.getAttribute(ATTR_OUTPUT_TABLES)); } + //logical types + + @Test + public void testNullDateHandling() throws IOException, MalformedRecordException, InitializationException { + String schemaText = "{ \"name\":\"test\", \"type\":\"record\", \"fields\":[ { \"name\":\"dob\", \"type\": [ \"null\", { \"type\":\"int\", \"logicalType\":\"date\" } ] } ] }"; + schema = new Schema.Parser().parse(schemaText); + processor.setFields(Arrays.asList( + new FieldSchema("dob", serdeConstants.DATE_TYPE_NAME, "null dob") + )); + //setup runner + runner = TestRunners.newTestRunner(processor); + runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH); + MockRecordParser readerFactory = new MockRecordParser(); + final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema); + for (final RecordField recordField : recordSchema.getFields()) { + readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable()); + } + + readerFactory.addRecord(new Object[] { null }); + + runner.addControllerService("mock-reader-factory", readerFactory); + runner.enableControllerService(readerFactory); + runner.setProperty(PutHive3Streaming.RECORD_READER, "mock-reader-factory"); + + runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "dobs"); + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0); + assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR)); + assertEquals("default.dobs", flowFile.getAttribute(ATTR_OUTPUT_TABLES)); + } + + @Test + public void testNullTimestampHandling() throws IOException, MalformedRecordException, InitializationException { + String schemaText = "{ \"name\":\"test\", \"type\":\"record\", \"fields\":[ { \"name\":\"dob\", \"type\": [ \"null\", { \"type\":\"long\", \"logicalType\":\"timestamp-millis\" } ] } ] }"; + schema = new Schema.Parser().parse(schemaText); + processor.setFields(Arrays.asList( + new FieldSchema("dob", serdeConstants.TIMESTAMP_TYPE_NAME, "null dob") + )); + //setup runner + runner = TestRunners.newTestRunner(processor); + runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH); + MockRecordParser readerFactory = new MockRecordParser(); + final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema); + for (final RecordField recordField : recordSchema.getFields()) { + readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable()); + } + + readerFactory.addRecord(new Object[] { null }); + + runner.addControllerService("mock-reader-factory", readerFactory); + runner.enableControllerService(readerFactory); + runner.setProperty(PutHive3Streaming.RECORD_READER, "mock-reader-factory"); + + runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "ts"); + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0); + assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR)); + assertEquals("default.ts", flowFile.getAttribute(ATTR_OUTPUT_TABLES)); + } + + @Test + public void testNullDecimalHandling() throws IOException, MalformedRecordException, InitializationException { + String schemaText = "{ \"name\":\"test\", \"type\":\"record\", \"fields\":[ { \"name\":\"amount\", \"type\": [ \"null\", { \"type\":\"bytes\", " + + "\"logicalType\":\"decimal\", \"precision\":18, \"scale\":2 } ] } ] }"; + schema = new Schema.Parser().parse(schemaText); + processor.setFields(Arrays.asList( + new FieldSchema("amount", serdeConstants.DECIMAL_TYPE_NAME, "null amount") + )); + //setup runner + runner = TestRunners.newTestRunner(processor); + runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH); + MockRecordParser readerFactory = new MockRecordParser(); + final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema); + for (final RecordField recordField : recordSchema.getFields()) { + readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable()); + } + + readerFactory.addRecord(new Object[] { null }); + + runner.addControllerService("mock-reader-factory", readerFactory); + runner.enableControllerService(readerFactory); + runner.setProperty(PutHive3Streaming.RECORD_READER, "mock-reader-factory"); + + runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "transactions"); + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0); + assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR)); + assertEquals("default.transactions", flowFile.getAttribute(ATTR_OUTPUT_TABLES)); + } + + @Test + public void testNullArrayHandling() throws IOException, MalformedRecordException, InitializationException { + String schemaText = "{ \"name\":\"test\", \"type\":\"record\", \"fields\":[ { \"name\":\"groups\", \"type\": [ \"null\", { \"type\":\"array\", \"items\":\"string\" } ] } ] }"; + schema = new Schema.Parser().parse(schemaText); + processor.setFields(Arrays.asList( + new FieldSchema("groups", "array<string>", "null groups") + )); + //setup runner + runner = TestRunners.newTestRunner(processor); + runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH); + MockRecordParser readerFactory = new MockRecordParser(); + final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema); + for (final RecordField recordField : recordSchema.getFields()) { + readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable()); + } + + readerFactory.addRecord(new Object[] { null }); + + runner.addControllerService("mock-reader-factory", readerFactory); + runner.enableControllerService(readerFactory); + runner.setProperty(PutHive3Streaming.RECORD_READER, "mock-reader-factory"); + + runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "groups"); + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0); + assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR)); + assertEquals("default.groups", flowFile.getAttribute(ATTR_OUTPUT_TABLES)); + } + @Test public void cleanup() { processor.cleanup();