[GitHub] [nifi] tpalfy commented on a diff in pull request #6444: NIFI-10513: Added capture non-record fields to JsonTreeRowRecordReader

2022-09-27 Thread GitBox


tpalfy commented on code in PR #6444:
URL: https://github.com/apache/nifi/pull/6444#discussion_r981142413


##
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##
@@ -330,76 +334,96 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
 ageFilterUpper
 );
 
+AtomicReference nextRecordsUrl = new AtomicReference<>();
+
+do {
+
 FlowFile flowFile = session.create();
 
 Map originalAttributes = flowFile.getAttributes();
 Map attributes = new HashMap<>();
 
 AtomicInteger recordCountHolder = new AtomicInteger();
 
-flowFile = session.write(flowFile, out -> {
-try (
-InputStream querySObjectResultInputStream = 
salesforceRestService.query(querySObject);
-JsonTreeRowRecordReader jsonReader = new 
JsonTreeRowRecordReader(
-querySObjectResultInputStream,
-getLogger(),
-convertedSalesforceSchema.recordSchema,
-DATE_FORMAT,
-TIME_FORMAT,
-DATE_TIME_FORMAT,
-StartingFieldStrategy.NESTED_FIELD,
-STARTING_FIELD_NAME,
-SchemaApplicationStrategy.SELECTED_PART
-);
-
-RecordSetWriter writer = writerFactory.createWriter(
-getLogger(),
-writerFactory.getSchema(
-originalAttributes,
-convertedSalesforceSchema.recordSchema
-),
-out,
-originalAttributes
-)
-) {
-writer.beginRecordSet();
-
-Record querySObjectRecord;
-while ((querySObjectRecord = jsonReader.nextRecord()) != null) 
{
-writer.write(querySObjectRecord);
-}
-
-WriteResult writeResult = writer.finishRecordSet();
-
-attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
-attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
-attributes.putAll(writeResult.getAttributes());
 
-recordCountHolder.set(writeResult.getRecordCount());
 
-if (ageFilterUpper != null) {
-Map newState = new 
HashMap<>(state.toMap());
-newState.put(LAST_AGE_FILTER, ageFilterUpper);
-updateState(context, newState);
+flowFile = session.write(flowFile, out -> {
+try (
+InputStream querySObjectResultInputStream = 
getResultInputStream(nextRecordsUrl, querySObject);
+
+JsonTreeRowRecordReader jsonReader = new 
JsonTreeRowRecordReader(
+querySObjectResultInputStream,
+getLogger(),
+convertedSalesforceSchema.recordSchema,
+DATE_FORMAT,
+TIME_FORMAT,
+DATE_TIME_FORMAT,
+StartingFieldStrategy.NESTED_FIELD,
+STARTING_FIELD_NAME,
+SchemaApplicationStrategy.SELECTED_PART,
+CAPTURE_PREDICATE
+);
+
+RecordSetWriter writer = writerFactory.createWriter(
+getLogger(),
+writerFactory.getSchema(
+originalAttributes,
+convertedSalesforceSchema.recordSchema
+),
+out,
+originalAttributes
+)
+) {
+writer.beginRecordSet();
+
+Record querySObjectRecord;
+while ((querySObjectRecord = jsonReader.nextRecord()) != 
null) {
+writer.write(querySObjectRecord);
+}
+
+WriteResult writeResult = writer.finishRecordSet();
+
+Map storedFields = 
jsonReader.getCapturedFields();
+
+nextRecordsUrl.set(storedFields.getOrDefault(CURSOR_URL, 
null));

Review Comment:
   ```suggestion
   Map capturedFields = 
jsonReader.getCapturedFields();
   
   
nextRecordsUrl.set(capturedFields.getOrDefault(CURSOR_URL, null));
   ```



##

[GitHub] [nifi] tpalfy commented on a diff in pull request #6444: NIFI-10513: Added capture non-record fields to JsonTreeRowRecordReader

2022-09-26 Thread GitBox


tpalfy commented on code in PR #6444:
URL: https://github.com/apache/nifi/pull/6444#discussion_r980339020


##
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java:
##
@@ -76,26 +81,60 @@ private AbstractJsonRowRecordReader(final ComponentLog 
logger, final String date
 LAZY_TIMESTAMP_FORMAT = () -> tsf;
 }
 
-protected AbstractJsonRowRecordReader(final InputStream in, final 
ComponentLog logger, final String dateFormat, final String timeFormat, final 
String timestampFormat)
+protected AbstractJsonRowRecordReader(final InputStream in,
+  final ComponentLog logger,
+  final String dateFormat,
+  final String timeFormat,
+  final String timestampFormat)
 throws IOException, MalformedRecordException {
 
-this(in, logger, dateFormat, timeFormat, timestampFormat, null, null);
+this(in, logger, dateFormat, timeFormat, timestampFormat, null, null, 
null);
 }
 
-protected AbstractJsonRowRecordReader(final InputStream in, final 
ComponentLog logger, final String dateFormat, final String timeFormat, final 
String timestampFormat,
-  final StartingFieldStrategy 
strategy, final String nestedFieldName) throws IOException, 
MalformedRecordException {
+/**
+ * Constructor with initial logic for JSON to NiFi record parsing.
+ *
+ * @param in the input stream to parse
+ * @param logger ComponentLog
+ * @param dateFormat format for parsing date fields
+ * @param timeFormat format for parsing time fields
+ * @param timestampFormatformat for parsing timestamp fields
+ * @param strategy   whether to start processing from a 
specific field
+ * @param nestedFieldNamethe name of the field to start the 
processing from
+ * @param captureFieldPredicate predicate that takes a JSON fieldName and 
fieldValue to capture top-level non-processed fields which can
+ *   be accessed by calling {@link 
#getCapturedFields()}
+ * @throws IOException  in case of JSON stream processing 
failure
+ * @throws MalformedRecordException in case of malformed JSON input
+ */
+protected AbstractJsonRowRecordReader(final InputStream in,
+  final ComponentLog logger,
+  final String dateFormat,
+  final String timeFormat,
+  final String timestampFormat,
+  final StartingFieldStrategy strategy,
+  final String nestedFieldName,
+  final BiPredicate 
captureFieldPredicate)
+throws IOException, MalformedRecordException {
 
 this(logger, dateFormat, timeFormat, timestampFormat);
 
 this.strategy = strategy;
+this.captureFieldPredicate = captureFieldPredicate;
+capturedFields = new HashMap<>();
 
 try {
 jsonParser = jsonFactory.createParser(in);
 jsonParser.setCodec(codec);
 
 if (strategy == StartingFieldStrategy.NESTED_FIELD) {
-final SerializedString serializedStartingFieldName = new 
SerializedString(nestedFieldName);
-while (!jsonParser.nextFieldName(serializedStartingFieldName) 
&& jsonParser.hasCurrentToken());
+while (jsonParser.nextToken() != null) {
+if (nestedFieldName.equals(jsonParser.getCurrentName())) {
+break;
+}

Review Comment:
   ```suggestion
   if (nestedFieldName.equals(jsonParser.getCurrentName())) 
{
   logger.debug("Parsing starting at nested field 
[{}]", nestedFieldName);
   break;
   }
   ```



##
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java:
##
@@ -76,26 +81,60 @@ private AbstractJsonRowRecordReader(final ComponentLog 
logger, final String date
 LAZY_TIMESTAMP_FORMAT = () -> tsf;
 }
 
-protected AbstractJsonRowRecordReader(final InputStream in, final 
ComponentLog logger, final String dateFormat, final String timeFormat, final 
String timestampFormat)
+protected AbstractJsonRowRecordReader(final InputStream in,
+  final ComponentLog logger,
+  final String dateFormat,
+