dan-s1 commented on code in PR #7665: URL: https://github.com/apache/nifi/pull/7665#discussion_r1350266069
########## nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java: ########## @@ -0,0 +1,1347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.yaml; + +import org.apache.avro.Schema; +import org.apache.commons.io.FileUtils; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.json.JsonSchemaInference; +import org.apache.nifi.json.JsonTreeRowRecordReader; +import org.apache.nifi.json.SchemaApplicationStrategy; +import org.apache.nifi.json.StartingFieldStrategy; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schema.inference.InferSchemaAccessStrategy; +import org.apache.nifi.schema.inference.TimeValueInference; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.ChoiceDataType; +import org.apache.nifi.util.EqualsWrapper; +import org.apache.nifi.util.MockComponentLog; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiPredicate; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +class TestYamlTreeRowRecordReader { + private final String dateFormat = RecordFieldType.DATE.getDefaultFormat(); + private final String timeFormat = RecordFieldType.TIME.getDefaultFormat(); + private final String timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat(); + + private List<RecordField> getDefaultFields() { + return getFields(RecordFieldType.DOUBLE.getDataType()); + } + + private List<RecordField> getFields(final DataType balanceDataType) { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("balance", balanceDataType)); + fields.add(new RecordField("address", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("city", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("state", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("zipCode", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("country", RecordFieldType.STRING.getDataType())); + return fields; + } + + private RecordSchema getAccountSchema() { + final List<RecordField> accountFields = new ArrayList<>(); + accountFields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + accountFields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType())); + + return new SimpleRecordSchema(accountFields); + } + + @Test + void testReadChoiceOfStringOrArrayOfRecords() throws IOException, MalformedRecordException { + final File schemaFile = new File("src/test/resources/json/choice-of-string-or-array-record.avsc"); + final File jsonFile = new File("src/test/resources/yaml/choice-of-string-or-array-record.yaml"); + + final Schema avroSchema = new Schema.Parser().parse(schemaFile); + final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema); + + try (final InputStream fis = new FileInputStream(jsonFile); + final YamlTreeRowRecordReader reader = new YamlTreeRowRecordReader(fis, new MockComponentLog("id", "id"), recordSchema, dateFormat, timeFormat, timestampFormat)) { + + final Record record = reader.nextRecord(); + final Object[] fieldsArray = record.getAsArray("fields"); + assertEquals(2, fieldsArray.length); + + final Object firstElement = fieldsArray[0]; + assertTrue(firstElement instanceof Record); + assertEquals("string", ((Record) firstElement).getAsString("type")); + + final Object secondElement = fieldsArray[1]; + assertTrue(secondElement instanceof Record); + final Object[] typeArray = ((Record) secondElement).getAsArray("type"); + assertEquals(1, typeArray.length); + + final Object firstType = typeArray[0]; + assertTrue(firstType instanceof Record); + final Record firstTypeRecord = (Record) firstType; + assertEquals("string", firstTypeRecord.getAsString("type")); + } + } + + @Test + void testChoiceOfRecordTypes() throws IOException, MalformedRecordException { + final Schema avroSchema = new Schema.Parser().parse(new File("src/test/resources/json/record-choice.avsc")); + final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema); + + try (final InputStream in = new FileInputStream("src/test/resources/yaml/elements-for-record-choice.yaml"); + final YamlTreeRowRecordReader reader = new YamlTreeRowRecordReader(in, mock(ComponentLog.class), recordSchema, dateFormat, timeFormat, timestampFormat)) { + + // evaluate first record + final Record firstRecord = reader.nextRecord(); + assertNotNull(firstRecord); + final RecordSchema firstOuterSchema = firstRecord.getSchema(); + assertEquals(Arrays.asList("id", "child"), firstOuterSchema.getFieldNames()); + assertEquals("1234", firstRecord.getValue("id")); + + // record should have a schema that indicates that the 'child' is a CHOICE of 2 different record types + assertSame(RecordFieldType.CHOICE, firstOuterSchema.getDataType("child").get().getFieldType()); + final List<DataType> firstSubTypes = ((ChoiceDataType) firstOuterSchema.getDataType("child").get()).getPossibleSubTypes(); + assertEquals(2, firstSubTypes.size()); + assertEquals(2L, firstSubTypes.stream().filter(type -> type.getFieldType() == RecordFieldType.RECORD).count()); + + // child record should have a schema with "id" as the only field + final Object childObject = firstRecord.getValue("child"); + assertTrue(childObject instanceof Record); + final Record firstChildRecord = (Record) childObject; + final RecordSchema firstChildSchema = firstChildRecord.getSchema(); + + assertEquals(Collections.singletonList("id"), firstChildSchema.getFieldNames()); + + // evaluate second record + final Record secondRecord = reader.nextRecord(); + assertNotNull(secondRecord); + + final RecordSchema secondOuterSchema = secondRecord.getSchema(); + assertEquals(Arrays.asList("id", "child"), secondOuterSchema.getFieldNames()); + assertEquals("1234", secondRecord.getValue("id")); + + // record should have a schema that indicates that the 'child' is a CHOICE of 2 different record types + assertSame(RecordFieldType.CHOICE, secondOuterSchema.getDataType("child").get().getFieldType()); + final List<DataType> secondSubTypes = ((ChoiceDataType) secondOuterSchema.getDataType("child").get()).getPossibleSubTypes(); + assertEquals(2, secondSubTypes.size()); + assertEquals(2L, secondSubTypes.stream().filter(type -> type.getFieldType() == RecordFieldType.RECORD).count()); + + // child record should have a schema with "name" as the only field + final Object secondChildObject = secondRecord.getValue("child"); + assertTrue(secondChildObject instanceof Record); + final Record secondChildRecord = (Record) secondChildObject; + final RecordSchema secondChildSchema = secondChildRecord.getSchema(); + + assertEquals(Collections.singletonList("name"), secondChildSchema.getFieldNames()); + + assertNull(reader.nextRecord()); + } + + } + + @Test + void testReadArray() throws IOException, MalformedRecordException { + final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); + + try (final InputStream in = new FileInputStream("src/test/resources/yaml/bank-account-array.yaml"); + final YamlTreeRowRecordReader reader = new YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + + final List<String> fieldNames = schema.getFieldNames(); + final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country"); + assertEquals(expectedFieldNames, fieldNames); + + final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList()); + final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING, + RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING); + assertEquals(expectedTypes, dataTypes); + + final Object[] firstRecordValues = reader.nextRecord().getValues(); + assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues); + + final Object[] secondRecordValues = reader.nextRecord().getValues(); + assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues); + + assertNull(reader.nextRecord()); + } + } + + @Test + @Disabled("Not sure there is such a thing as one Yaml doc per line") + void testReadOneLinePerJSON() throws IOException, MalformedRecordException { + final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); + + try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-oneline.json"); + final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + + final List<String> fieldNames = schema.getFieldNames(); + final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country"); + assertEquals(expectedFieldNames, fieldNames); + + final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList()); + final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING, + RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING); + assertEquals(expectedTypes, dataTypes); + + final Object[] firstRecordValues = reader.nextRecord().getValues(); + assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues); + + final Object[] secondRecordValues = reader.nextRecord().getValues(); + assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues); + + assertNull(reader.nextRecord()); + } + } + + @Test + @Disabled("TODO Determine whether this is possible in Yaml") + void testReadMultilineJSON() throws IOException, MalformedRecordException { + final List<RecordField> fields = getFields(RecordFieldType.DECIMAL.getDecimalDataType(30, 10)); + final RecordSchema schema = new SimpleRecordSchema(fields); + + try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-multiline.json"); + final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + + final List<String> fieldNames = schema.getFieldNames(); + final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country"); + assertEquals(expectedFieldNames, fieldNames); + + final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList()); + final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING, + RecordFieldType.DECIMAL, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING); + assertEquals(expectedTypes, dataTypes); + + final Object[] firstRecordValues = reader.nextRecord().getValues(); + assertArrayEquals(new Object[] {1, "John Doe", BigDecimal.valueOf(4750.89), "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues); + + final Object[] secondRecordValues = reader.nextRecord().getValues(); + assertArrayEquals(new Object[] {2, "Jane Doe", BigDecimal.valueOf(4820.09), "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues); + + assertNull(reader.nextRecord()); + } + } + + @Test + @Disabled("TODO Determine whether this is possible in Yaml as two arrays may end up as one array") + void testReadMultilineArrays() throws IOException, MalformedRecordException { + final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); + + try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-multiarray.json"); + final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + + final List<String> fieldNames = schema.getFieldNames(); + final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country"); + assertEquals(expectedFieldNames, fieldNames); + + final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList()); + final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING, + RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING); + assertEquals(expectedTypes, dataTypes); + + final Object[] firstRecordValues = reader.nextRecord().getValues(); + assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues); + + final Object[] secondRecordValues = reader.nextRecord().getValues(); + assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues); + + final Object[] thirdRecordValues = reader.nextRecord().getValues(); + assertArrayEquals(new Object[] {3, "Maria Doe", 4750.89, "123 My Street", "My City", "ME", "11111", "USA"}, thirdRecordValues); + + final Object[] fourthRecordValues = reader.nextRecord().getValues(); + assertArrayEquals(new Object[] {4, "Xi Doe", 4820.09, "321 Your Street", "Your City", "NV", "33333", "USA"}, fourthRecordValues); + + assertNull(reader.nextRecord()); + } + } + + @Test + @Disabled("Not sure this makes sense in Yaml") + void testReadMixedJSON() throws IOException, MalformedRecordException { + final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); + + try (final InputStream in = new FileInputStream("src/test/resources/yaml/bank-account-mixed.yaml"); + final YamlTreeRowRecordReader reader = new YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { + + final List<String> fieldNames = schema.getFieldNames(); + final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country"); + assertEquals(expectedFieldNames, fieldNames); + + final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList()); + final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING, + RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING); + assertEquals(expectedTypes, dataTypes); + + final Object[] firstRecordValues = reader.nextRecord().getValues(); + assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues); + + final Object[] secondRecordValues = reader.nextRecord().getValues(); + assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues); + + //final Object[] thirdRecordValues = reader.nextRecord().getValues(); + //assertArrayEquals(new Object[] {3, "Maria Doe", 4750.89, "123 My Street", "My City", "ME", "11111", "USA"}, thirdRecordValues); + + final Object[] fourthRecordValues = reader.nextRecord().getValues(); + assertArrayEquals(new Object[] {4, "Xi Doe", 4820.09, "321 Your Street", "Your City", "NV", "33333", "USA"}, fourthRecordValues); + + + assertNull(reader.nextRecord()); + } + } Review Comment: @exceptionfactory I removed all the disabled tests. It turns out the yaml file mentioned in one of the tests did not actually exist so there was nothing to remove. I assume I must have made the change in the path of the test but never committed the actual yaml file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org