tpalfy commented on code in PR #6018:
URL: https://github.com/apache/nifi/pull/6018#discussion_r882671956
##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java:
##########
@@ -44,30 +44,51 @@
import java.util.function.Supplier;
public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
- private final RecordSchema schema;
+ private final RecordSchema schema;
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog
logger, final RecordSchema schema,
- final String dateFormat, final String timeFormat, final String
timestampFormat) throws IOException, MalformedRecordException {
- super(in, logger, dateFormat, timeFormat, timestampFormat);
- this.schema = schema;
+ final String dateFormat, final String
timeFormat, final String timestampFormat) throws IOException,
MalformedRecordException {
+ this(in, logger, schema, dateFormat, timeFormat, timestampFormat,
null, null, null);
}
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog
logger, final RecordSchema schema,
Review Comment:
Sorry if I missed this previously but we shouldn't change the signature of
existing public constructors. We can create new ones as necessary instead.
##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java:
##########
@@ -44,30 +44,51 @@
import java.util.function.Supplier;
public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
- private final RecordSchema schema;
+ private final RecordSchema schema;
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog
logger, final RecordSchema schema,
- final String dateFormat, final String timeFormat, final String
timestampFormat) throws IOException, MalformedRecordException {
- super(in, logger, dateFormat, timeFormat, timestampFormat);
- this.schema = schema;
+ final String dateFormat, final String
timeFormat, final String timestampFormat) throws IOException,
MalformedRecordException {
+ this(in, logger, schema, dateFormat, timeFormat, timestampFormat,
null, null, null);
}
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog
logger, final RecordSchema schema,
final String dateFormat, final String
timeFormat, final String timestampFormat,
- final StartingFieldStrategy strategy, final
String startingFieldName) throws IOException, MalformedRecordException {
- super(in, logger, dateFormat, timeFormat, timestampFormat, strategy,
startingFieldName);
- this.schema = schema;
+ final StartingFieldStrategy
startingFieldStrategy, final String startingFieldName,
+ final SchemaApplicationStrategy
schemaApplicationStrategy)
+ throws IOException, MalformedRecordException {
+
+ super(in, logger, dateFormat, timeFormat, timestampFormat,
startingFieldStrategy, startingFieldName);
+ if (startingFieldStrategy == StartingFieldStrategy.NESTED_FIELD &&
schemaApplicationStrategy == SchemaApplicationStrategy.WHOLE_JSON) {
+ this.schema = getSelectedSchema(schema, startingFieldName);
+ } else {
+ this.schema = schema;
+ }
+ }
+
+ private RecordSchema getSelectedSchema(final RecordSchema schema, final
String startingFieldName) {
Review Comment:
This implementation can find only 1-level deep nested schemas, no deeper
than that.
Apart from adjusting the production code, I propose the following test as
well:
```
src/test/resources/json/single-element-deep-nested.json:
{
"rootInt": 100,
"rootString": "root_string",
"nestedLevel1Record": {
"nestedLevel1Int": 110,
"nestedLevel1String": "root.level1:string",
"nestedLevel2Record": {
"nestedLevel2Int": 111,
"nestedLevel2String": "root.level1.level2:string"
}
}
}
In org.apache.nifi.json.TestJsonTreeRowRecordReader:
@Test
void testStartFromDeepNestedObject() throws IOException,
MalformedRecordException {
String jsonPath =
"src/test/resources/json/single-element-deep-nested.json";
RecordSchema recordSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("rootInt",
RecordFieldType.INT.getDataType()),
new RecordField("rootString",
RecordFieldType.STRING.getDataType()),
new RecordField("nestedLevel1Record",
RecordFieldType.RECORD.getRecordDataType(
new SimpleRecordSchema(Arrays.asList(
new RecordField("nestedLevel1Int",
RecordFieldType.INT.getDataType()),
new RecordField("nestedLevel1String",
RecordFieldType.STRING.getDataType()),
new RecordField("nestedLevel2Record",
RecordFieldType.RECORD.getRecordDataType(
new SimpleRecordSchema(Arrays.asList(
new
RecordField("nestedLevel2Int", RecordFieldType.INT.getDataType()),
new
RecordField("nestedLevel2String", RecordFieldType.STRING.getDataType())
))
))
))
))
));
SimpleRecordSchema expectedRecordSchema = new
SimpleRecordSchema(Arrays.asList(
new RecordField("nestedLevel2Int",
RecordFieldType.INT.getDataType()),
new RecordField("nestedLevel2String",
RecordFieldType.STRING.getDataType())
));
List<Object> expected = Collections.singletonList(
new MapRecord(expectedRecordSchema, new HashMap<String,
Object>() {{
put("nestedLevel2Int", 111);
put("nestedLevel2String", "root.level1.level2:string");
}})
);
testReadRecords(jsonPath, recordSchema, expected,
StartingFieldStrategy.NESTED_FIELD,
"nestedLevel2Record", SchemaApplicationStrategy.WHOLE_JSON);
}
```
##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java:
##########
@@ -1128,29 +1140,82 @@ void testStartFromNestedFieldThenStartObject() throws
IOException, MalformedReco
}})
);
- testReadRecords(jsonPath, expectedRecordSchema, expected,
StartingFieldStrategy.NESTED_FIELD, "accounts");
+ testReadRecords(jsonPath, expectedRecordSchema, expected,
StartingFieldStrategy.NESTED_FIELD,
+ "accounts", SchemaApplicationStrategy.SELECTED_PART);
+ }
+
+ @Test
+ void testStartFromNestedObjectWithWholeJsonSchemaScope() throws
IOException, MalformedRecordException {
+ String jsonPath = "src/test/resources/json/single-element-nested.json";
+
+ RecordSchema accountSchema = new SimpleRecordSchema(Arrays.asList(
+ new RecordField("id", RecordFieldType.INT.getDataType()),
+ new RecordField("balance",
RecordFieldType.DOUBLE.getDataType())
+ ));
+
+ RecordSchema recordSchema = new
SimpleRecordSchema(Collections.singletonList(
+ new RecordField("account",
RecordFieldType.RECORD.getRecordDataType(accountSchema))
+ ));
+
+ RecordSchema expectedRecordSchema = accountSchema;
+
+ List<Object> expected = Collections.singletonList(
+ new MapRecord(expectedRecordSchema, new HashMap<String,
Object>() {{
+ put("id", 42);
+ put("balance", 4750.89);
+ }})
+ );
+
+ testReadRecords(jsonPath, recordSchema, expected,
StartingFieldStrategy.NESTED_FIELD,
+ "account", SchemaApplicationStrategy.WHOLE_JSON);
+ }
+
+ @Test
+ void testStartFromNestedArrayWithWholeJsonSchemaScope() throws
IOException, MalformedRecordException {
+ String jsonPath =
"src/test/resources/json/single-element-nested-array.json";
+
+ RecordSchema accountSchema = new SimpleRecordSchema(Arrays.asList(
+ new RecordField("id", RecordFieldType.INT.getDataType()),
+ new RecordField("balance",
RecordFieldType.DOUBLE.getDataType())
+ ));
+
+ RecordSchema recordSchema = new
SimpleRecordSchema(Collections.singletonList(
+ new RecordField("accounts",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(accountSchema)))
+ ));
+
+ RecordSchema expectedRecordSchema = accountSchema;
+
+ List<Object> expected = Arrays.asList(
+ new MapRecord(expectedRecordSchema, new HashMap<String,
Object>() {{
+ put("id", 42);
+ put("balance", 4750.89);
+ }}),
+ new MapRecord(expectedRecordSchema, new HashMap<String,
Object>() {{
+ put("id", 43);
+ put("balance", 48212.38);
+ }})
+ );
+
+ testReadRecords(jsonPath, recordSchema, expected,
StartingFieldStrategy.NESTED_FIELD,
+ "accounts", SchemaApplicationStrategy.WHOLE_JSON);
}
private void testReadRecords(String jsonPath, List<Object> expected)
throws IOException, MalformedRecordException {
- // GIVEN
final File jsonFile = new File(jsonPath);
-
try (
InputStream jsonStream = new
ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))
) {
RecordSchema schema = inferSchema(jsonStream,
StartingFieldStrategy.ROOT_NODE, null);
-
- // WHEN
- // THEN
testReadRecords(jsonStream, schema, expected);
}
}
- private void testReadRecords(String jsonPath, List<Object> expected,
StartingFieldStrategy strategy, String startingFieldName) throws IOException,
MalformedRecordException {
+ private void testReadRecords(String jsonPath, List<Object> expected,
StartingFieldStrategy strategy,
Review Comment:
This method now accepts a `SchemaApplicationStrategy` but always tests the
production code with an inferred schema that is based on the `staringFieldName`
i.e. a nested schema.
In other words if `SchemaApplicationStrategy.WHOLE_JSON` is provided the
test case does not correspond to any possible real-life scenario.
I would either remove the `schemaApplicationStrategy` parameter (and
consider this utility method to be restricted to cases when the nested schema
is to be used a.k.a. `SchemaApplicationStrategy` is `SELECTED_PART`) or update
the `inferSchema` logic. I'd vote for the former - in my opinion such logic
shouldn't be used in the test code in the first place.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]