timeabarna commented on a change in pull request #5358: URL: https://github.com/apache/nifi/pull/5358#discussion_r740972013
########## File path: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java ########## @@ -690,6 +697,115 @@ public void testMaxRowsPerFlowFileWithMaxFragments() throws ClassNotFoundExcepti runner.clearTransferState(); } + @Test + public void testAvroRecordCreatedWithoutLogicalTypesByDefault() throws SQLException, IOException { + final Schema expectedSchema = SchemaBuilder.record("NiFi_SelectHiveQL_Record").namespace("any.data").fields() + .name("ID").type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault() + .name("NAME").type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault() + .name("BIRTH_DATE").type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault() + .name("BIG_NUMBER").type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault() + .name("CREATED_ON").type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault() + .endRecord(); + final int expectedId = 1; + final String expectedName = "Joe Smith"; + final String expectedBirthDate = "1956-11-22"; + final String expectedBigNumber = "12345678.12"; + final String expectedCreatedOn = "1962-09-23 03:23:34.234"; + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + final Statement stmt = con.createStatement(); + final InputStream in; + final MockFlowFile mff; + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), birth_date date, big_number decimal(10,2),created_on timestamp)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, birth_date, big_number, created_on) VALUES (1, 'Joe Smith', '1956-11-22', 12345678.12, '1962-09-23 03:23:34.234')"); + + runner.setIncomingConnection(false); + runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, "SELECT * FROM TEST_QUERY_DB_TABLE"); + + runner.run(); + + runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 1); + mff = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(0); + in = new ByteArrayInputStream(mff.toByteArray()); + + final GenericRecord record = getFirstRecordFromStream(in); + + assertEquals(expectedSchema, record.getSchema()); + assertEquals(expectedId, record.get("ID")); + assertEquals(expectedName, record.get("NAME").toString()); + assertEquals(expectedBirthDate, record.get("BIRTH_DATE").toString()); + assertEquals(expectedBigNumber, record.get("BIG_NUMBER").toString()); + assertEquals(expectedCreatedOn, record.get("CREATED_ON").toString()); + + runner.clearTransferState(); + } + + @Test + public void testAvroRecordCreatedWithLogicalTypesWhenSet() throws SQLException, IOException { + final Schema expectedSchema = SchemaBuilder.record("NiFi_SelectHiveQL_Record").namespace("any.data").fields() + .name("ID").type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault() + .name("NAME").type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault() + .name("BIRTH_DATE").type().unionOf().nullBuilder().endNull().and() + .type(LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT))).endUnion().noDefault() + .name("BIG_NUMBER").type().unionOf().nullBuilder().endNull().and() + .type(LogicalTypes.decimal(10, 2).addToSchema(Schema.create(Schema.Type.BYTES))).endUnion().noDefault() + .name("CREATED_ON").type().unionOf().nullBuilder().endNull().and() + .type(LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))).endUnion().noDefault() + .endRecord(); + final int expectedId = 1; + final String expectedName = "Joe Smith"; + final int expectedBirthDate = (int) LocalDate.parse("1956-11-22").toEpochDay(); + final BigDecimal decimal = new BigDecimal("12345678.12").setScale(2, BigDecimal.ROUND_HALF_EVEN); + final ByteBuffer expectedBigNumber = ByteBuffer.wrap(decimal.unscaledValue().toByteArray()); + final Timestamp timestamp = Timestamp.valueOf("1962-09-23 03:23:34.234"); Review comment: Thanks @exceptionfactory for your help, I've modified the tests based on your recommendations. ########## File path: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java ########## @@ -149,7 +154,16 @@ public static long convertToAvroStream(final ResultSet rs, final OutputStream ou // org.apache.avro.AvroRuntimeException: Unknown datum type java.lang.Byte rec.put(i - 1, ((Byte) value).intValue()); - } else if (value instanceof BigDecimal || value instanceof BigInteger) { + } else if (value instanceof BigDecimal) { + if (useLogicalTypes) { + final int precision = meta.getPrecision(i) > 1 ? meta.getPrecision(i) : 10; Review comment: Thanks @exceptionfactory for your help, I've modified the code based on your recommendations. ########## File path: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java ########## @@ -298,14 +316,32 @@ public static Schema createSchema(final ResultSet rs, String recordName, boolean // Did not find direct suitable type, need to be clarified!!!! case DECIMAL: case NUMERIC: - builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); + if (useLogicalTypes) { + final int precision = meta.getPrecision(i) > 1 ? meta.getPrecision(i) : 10; Review comment: Thanks @exceptionfactory If for some reason there would be an incorrect value (0 or less for precision, negative number for scale) for either precision or scale I set the precision for Hive default which is 10 for precision and 0 for scale. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org