tpalfy commented on code in PR #6444:
URL: https://github.com/apache/nifi/pull/6444#discussion_r980339020
##
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java:
##
@@ -76,26 +81,60 @@ private AbstractJsonRowRecordReader(final ComponentLog
logger, final String date
LAZY_TIMESTAMP_FORMAT = () -> tsf;
}
-protected AbstractJsonRowRecordReader(final InputStream in, final
ComponentLog logger, final String dateFormat, final String timeFormat, final
String timestampFormat)
+protected AbstractJsonRowRecordReader(final InputStream in,
+ final ComponentLog logger,
+ final String dateFormat,
+ final String timeFormat,
+ final String timestampFormat)
throws IOException, MalformedRecordException {
-this(in, logger, dateFormat, timeFormat, timestampFormat, null, null);
+this(in, logger, dateFormat, timeFormat, timestampFormat, null, null,
null);
}
-protected AbstractJsonRowRecordReader(final InputStream in, final
ComponentLog logger, final String dateFormat, final String timeFormat, final
String timestampFormat,
- final StartingFieldStrategy
strategy, final String nestedFieldName) throws IOException,
MalformedRecordException {
+/**
+ * Constructor with initial logic for JSON to NiFi record parsing.
+ *
+ * @param in the input stream to parse
+ * @param logger ComponentLog
+ * @param dateFormat format for parsing date fields
+ * @param timeFormat format for parsing time fields
+ * @param timestampFormatformat for parsing timestamp fields
+ * @param strategy whether to start processing from a
specific field
+ * @param nestedFieldNamethe name of the field to start the
processing from
+ * @param captureFieldPredicate predicate that takes a JSON fieldName and
fieldValue to capture top-level non-processed fields which can
+ * be accessed by calling {@link
#getCapturedFields()}
+ * @throws IOException in case of JSON stream processing
failure
+ * @throws MalformedRecordException in case of malformed JSON input
+ */
+protected AbstractJsonRowRecordReader(final InputStream in,
+ final ComponentLog logger,
+ final String dateFormat,
+ final String timeFormat,
+ final String timestampFormat,
+ final StartingFieldStrategy strategy,
+ final String nestedFieldName,
+ final BiPredicate
captureFieldPredicate)
+throws IOException, MalformedRecordException {
this(logger, dateFormat, timeFormat, timestampFormat);
this.strategy = strategy;
+this.captureFieldPredicate = captureFieldPredicate;
+capturedFields = new HashMap<>();
try {
jsonParser = jsonFactory.createParser(in);
jsonParser.setCodec(codec);
if (strategy == StartingFieldStrategy.NESTED_FIELD) {
-final SerializedString serializedStartingFieldName = new
SerializedString(nestedFieldName);
-while (!jsonParser.nextFieldName(serializedStartingFieldName)
&& jsonParser.hasCurrentToken());
+while (jsonParser.nextToken() != null) {
+if (nestedFieldName.equals(jsonParser.getCurrentName())) {
+break;
+}
Review Comment:
```suggestion
if (nestedFieldName.equals(jsonParser.getCurrentName()))
{
logger.debug("Parsing starting at nested field
[{}]", nestedFieldName);
break;
}
```
##
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java:
##
@@ -76,26 +81,60 @@ private AbstractJsonRowRecordReader(final ComponentLog
logger, final String date
LAZY_TIMESTAMP_FORMAT = () -> tsf;
}
-protected AbstractJsonRowRecordReader(final InputStream in, final
ComponentLog logger, final String dateFormat, final String timeFormat, final
String timestampFormat)
+protected AbstractJsonRowRecordReader(final InputStream in,
+ final ComponentLog logger,
+ final String dateFormat,
+