krisztina-zsihovszki commented on code in PR #7019:
URL: https://github.com/apache/nifi/pull/7019#discussion_r1138356484


##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -385,144 +354,181 @@ private void processQuery(ProcessContext context, 
ProcessSession session) {
                     .collect(Collectors.joining(","));
         }
 
-        String querySObject = buildQuery(
-                sObject,
-                fields,
-                customWhereClause,
-                ageField,
-                initialAgeFilter,
-                ageFilterLower,
-                ageFilterUpper
-        );
+        String querySObject = new SalesforceQueryBuilder(incrementalContext)
+                .buildQuery(sObject, fields, customWhereClause);
+
+        AtomicBoolean isOriginalTransferred = new AtomicBoolean(false);
+        List<FlowFile> outgoingFlowFiles = new ArrayList<>();
+        Map<String, String> originalAttributes = 
Optional.ofNullable(originalFlowFile)
+                .map(FlowFile::getAttributes)
+                .orElseGet(HashMap::new);
+
+        long startNanos = System.nanoTime();
 
         do {
-            FlowFile flowFile = session.create();
-            Map<String, String> originalAttributes = flowFile.getAttributes();
-            Map<String, String> attributes = new HashMap<>();
+            FlowFile outgoingFlowFile = createOutgoingFlowFile(session, 
originalFlowFile);
+            outgoingFlowFiles.add(outgoingFlowFile);
+            Map<String, String> attributes = new HashMap<>(originalAttributes);
 
             AtomicInteger recordCountHolder = new AtomicInteger();
-            long startNanos = System.nanoTime();
-            flowFile = session.write(flowFile, out -> {
-                try (
-                        InputStream querySObjectResultInputStream = 
getResultInputStream(nextRecordsUrl.get(), querySObject);
-
-                        JsonTreeRowRecordReader jsonReader = new 
JsonTreeRowRecordReader(
-                                querySObjectResultInputStream,
-                                getLogger(),
-                                salesForceSchemaHolder.recordSchema,
-                                DATE_FORMAT,
-                                TIME_FORMAT,
-                                DATE_TIME_FORMAT,
-                                StartingFieldStrategy.NESTED_FIELD,
-                                STARTING_FIELD_NAME,
-                                SchemaApplicationStrategy.SELECTED_PART,
-                                CAPTURE_PREDICATE
-                        );
-
-                        RecordSetWriter writer = writerFactory.createWriter(
-                                getLogger(),
-                                writerFactory.getSchema(
-                                        originalAttributes,
-                                        salesForceSchemaHolder.recordSchema
-                                ),
-                                out,
-                                originalAttributes
-                        )
-                ) {
-                    writer.beginRecordSet();
-
-                    Record querySObjectRecord;
-                    while ((querySObjectRecord = jsonReader.nextRecord()) != 
null) {
-                        writer.write(querySObjectRecord);
-                    }
-
-                    WriteResult writeResult = writer.finishRecordSet();
+            try {
+                outgoingFlowFile = session.write(outgoingFlowFile, 
processRecordsCallback(context, nextRecordsUrl, writerFactory, state, 
incrementalContext,
+                        salesForceSchemaHolder, querySObject, 
originalAttributes, attributes, recordCountHolder));
+                int recordCount = recordCountHolder.get();
 
-                    Map<String, String> capturedFields = 
jsonReader.getCapturedFields();
+                if (createZeroRecordFlowFiles || recordCount != 0) {
+                    outgoingFlowFile = 
session.putAllAttributes(outgoingFlowFile, attributes);
 
-                    
nextRecordsUrl.set(capturedFields.getOrDefault(NEXT_RECORDS_URL, null));
+                    session.adjustCounter("Records Processed", recordCount, 
false);
+                    getLogger().info("Successfully written {} records for {}", 
recordCount, outgoingFlowFile);
+                } else {
+                    outgoingFlowFiles.remove(outgoingFlowFile);
+                    session.remove(outgoingFlowFile);
+                }
+            } catch (Exception e) {
+                if (e.getCause() instanceof IOException) {
+                    throw new ProcessException("Couldn't get Salesforce 
records", e);
+                } else if (e.getCause() instanceof SchemaNotFoundException) {
+                    handleError(session, originalFlowFile, 
isOriginalTransferred, outgoingFlowFiles, e, "Couldn't create record writer");
+                } else if (e.getCause() instanceof MalformedRecordException) {
+                    handleError(session, originalFlowFile, 
isOriginalTransferred, outgoingFlowFiles, e, "Couldn't read records from 
input");
+                } else {
+                    handleError(session, originalFlowFile, 
isOriginalTransferred, outgoingFlowFiles, e, "Couldn't get Salesforce records");
+                }
+                break;
+            }
+        } while (nextRecordsUrl.get() != null);
 
-                    attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
-                    attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
-                    attributes.putAll(writeResult.getAttributes());
+        transferFlowFiles(session, outgoingFlowFiles, originalFlowFile, 
isOriginalTransferred, startNanos, sObject);
+    }
 
-                    recordCountHolder.set(writeResult.getRecordCount());
+    private OutputStreamCallback processRecordsCallback(ProcessContext 
context, AtomicReference<String> nextRecordsUrl, RecordSetWriterFactory 
writerFactory,
+                                                        StateMap state, 
IncrementalContext incrementalContext, SalesforceSchemaHolder 
salesForceSchemaHolder,
+                                                        String querySObject, 
Map<String, String> originalAttributes, Map<String, String> attributes,
+                                                        AtomicInteger 
recordCountHolder) {
+        return out -> {
+            try {
+                handleRecordSet(out, nextRecordsUrl, querySObject, 
writerFactory, salesForceSchemaHolder, originalAttributes, attributes, 
recordCountHolder);
 
-                    if (ageFilterUpper != null) {
-                        Map<String, String> newState = new 
HashMap<>(state.toMap());
-                        newState.put(LAST_AGE_FILTER, ageFilterUpper);
-                        updateState(context, newState);
-                    }
-                } catch (SchemaNotFoundException e) {
-                    throw new ProcessException("Couldn't create record 
writer", e);
-                } catch (MalformedRecordException e) {
-                    throw new ProcessException("Couldn't read records from 
input", e);
+                if (incrementalContext.getAgeFilterUpper() != null) {
+                    Map<String, String> newState = new 
HashMap<>(state.toMap());
+                    newState.put(LAST_AGE_FILTER, 
incrementalContext.getAgeFilterUpper());
+                    updateState(context, newState);
                 }
-            });
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        };
+    }
 
-            int recordCount = recordCountHolder.get();
+    private void handleRecordSet(OutputStream out, AtomicReference<String> 
nextRecordsUrl, String querySObject, RecordSetWriterFactory writerFactory,
+                                 SalesforceSchemaHolder 
salesForceSchemaHolder, Map<String, String> originalAttributes, Map<String, 
String> attributes,
+                                 AtomicInteger recordCountHolder) throws 
Exception {
+        try (
+                InputStream querySObjectResultInputStream = 
getResultInputStream(nextRecordsUrl.get(), querySObject);
+                JsonTreeRowRecordReader jsonReader = 
createJsonReader(querySObjectResultInputStream, 
salesForceSchemaHolder.getRecordSchema());
+                RecordSetWriter writer = createRecordSetWriter(writerFactory, 
originalAttributes, out, salesForceSchemaHolder.getRecordSchema())
+        ) {
+            writer.beginRecordSet();
+
+            Record querySObjectRecord;
+            while ((querySObjectRecord = jsonReader.nextRecord()) != null) {
+                writer.write(querySObjectRecord);
+            }
 
-            if (!createZeroRecordFlowFiles && recordCount == 0) {
-                session.remove(flowFile);
-            } else {
-                flowFile = session.putAllAttributes(flowFile, attributes);
-                session.transfer(flowFile, REL_SUCCESS);
+            WriteResult writeResult = writer.finishRecordSet();
 
-                long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-                session.getProvenanceReporter().receive(flowFile, 
salesforceRestService.getVersionedBaseUrl() + "/composite/tree/" + sObject,
-                        transferMillis);
+            Map<String, String> capturedFields = 
jsonReader.getCapturedFields();
+            nextRecordsUrl.set(capturedFields.getOrDefault(NEXT_RECORDS_URL, 
null));
 
-                session.adjustCounter("Records Processed", recordCount, false);
-                getLogger().info("Successfully written {} records for {}", 
recordCount, flowFile);
-            }
-        } while (nextRecordsUrl.get() != null);
+            attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
+            attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
+            attributes.putAll(writeResult.getAttributes());
+            recordCountHolder.set(writeResult.getRecordCount());
+        }
+    }
+
+    private JsonTreeRowRecordReader createJsonReader(InputStream 
querySObjectResultInputStream, RecordSchema recordSchema) throws IOException, 
MalformedRecordException {
+        return new JsonTreeRowRecordReader(
+                querySObjectResultInputStream,
+                getLogger(),
+                recordSchema,
+                DATE_FORMAT,
+                TIME_FORMAT,
+                DATE_TIME_FORMAT,
+                StartingFieldStrategy.NESTED_FIELD,
+                STARTING_FIELD_NAME,
+                SchemaApplicationStrategy.SELECTED_PART,
+                CAPTURE_PREDICATE
+        );
+    }
+
+    private RecordSetWriter createRecordSetWriter(RecordSetWriterFactory 
writerFactory, Map<String, String> originalAttributes, OutputStream out,
+                                                  RecordSchema recordSchema) 
throws IOException, SchemaNotFoundException {
+        return writerFactory.createWriter(
+                getLogger(),
+                writerFactory.getSchema(
+                        originalAttributes,
+                        recordSchema
+                ),
+                out,
+                originalAttributes
+        );
     }
 
     private void processCustomQuery(ProcessContext context, ProcessSession 
session, FlowFile originalFlowFile) {
         String customQuery = 
context.getProperty(CUSTOM_SOQL_QUERY).evaluateAttributeExpressions(originalFlowFile).getValue();
         AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
         AtomicReference<String> totalSize = new AtomicReference<>();
-        boolean isOriginalTransferred = false;
+        AtomicBoolean isOriginalTransferred = new AtomicBoolean(false);
         List<FlowFile> outgoingFlowFiles = new ArrayList<>();
+        long startNanos = System.nanoTime();
         do {
-            FlowFile outgoingFlowFile;
             try (InputStream response = 
getResultInputStream(nextRecordsUrl.get(), customQuery)) {
-                if (originalFlowFile != null) {
-                    outgoingFlowFile = session.create(originalFlowFile);
-                } else {
-                    outgoingFlowFile = session.create();
-                }
+                FlowFile outgoingFlowFile = createOutgoingFlowFile(session, 
originalFlowFile);
                 outgoingFlowFiles.add(outgoingFlowFile);
-                outgoingFlowFile = session.write(outgoingFlowFile, 
parseHttpResponse(response, nextRecordsUrl, totalSize));
-                int recordCount = nextRecordsUrl.get() != null ? 2000 : 
Integer.parseInt(totalSize.get()) % 2000;
+                outgoingFlowFile = session.write(outgoingFlowFile, 
parseCustomQueryResponse(response, nextRecordsUrl, totalSize));
+                int recordCount = nextRecordsUrl.get() != null ? 
MAX_RECORD_COUNT : Integer.parseInt(totalSize.get()) % MAX_RECORD_COUNT;
                 Map<String, String> attributes = new HashMap<>();
                 attributes.put(CoreAttributes.MIME_TYPE.key(), 
"application/json");
-                attributes.put(TOTAL_RECORD_COUNT, 
String.valueOf(recordCount));
+                attributes.put(TOTAL_RECORD_COUNT_ATTRIBUTE, 
String.valueOf(recordCount));
                 session.adjustCounter("Salesforce records processed", 
recordCount, false);
                 session.putAllAttributes(outgoingFlowFile, attributes);
             } catch (IOException e) {
                 throw new ProcessException("Couldn't get Salesforce records", 
e);
             } catch (Exception e) {
-                if (originalFlowFile != null) {
-                    session.transfer(originalFlowFile, REL_FAILURE);
-                    isOriginalTransferred = true;
-                }
-                getLogger().error("Couldn't get Salesforce records", e);
-                session.remove(outgoingFlowFiles);
-                outgoingFlowFiles.clear();
+                handleError(session, originalFlowFile, isOriginalTransferred, 
outgoingFlowFiles, e, "Couldn't get Salesforce records");
                 break;
             }
         } while (nextRecordsUrl.get() != null);
 
+        transferFlowFiles(session, outgoingFlowFiles, originalFlowFile, 
isOriginalTransferred, startNanos, "custom");
+    }
+
+    private void transferFlowFiles(ProcessSession session, List<FlowFile> 
outgoingFlowFiles, FlowFile originalFlowFile, AtomicBoolean 
isOriginalTransferred,
+                                   long startNanos, String urlDetail) {
         if (!outgoingFlowFiles.isEmpty()) {
             session.transfer(outgoingFlowFiles, REL_SUCCESS);
+            long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+
+            outgoingFlowFiles.forEach(ff ->
+                    session.getProvenanceReporter().receive(ff, 
salesforceRestService.getVersionedBaseUrl() + urlDetail, transferMillis)

Review Comment:
   Wrong URi is created for custom query, a / is missing:
   
   "https://xxx.my.salesforce.com/services/data/v54.0custom";



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java:
##########
@@ -146,69 +136,85 @@ public void onTrigger(ProcessContext context, 
ProcessSession session) throws Pro
 
         String objectType = flowFile.getAttribute(ATTR_OBJECT_TYPE);
         if (objectType == null) {
-            getLogger().error("Salesforce object type not found among the 
incoming FlowFile attributes");
-            flowFile = session.putAttribute(flowFile, ATTR_ERROR_MESSAGE, 
"Salesforce object type not found among FlowFile attributes");
-            session.transfer(session.penalize(flowFile), REL_FAILURE);
+            handleInvalidFlowFile(session, flowFile);
             return;
         }
 
+        try {
+            long startNanos = System.nanoTime();
+            processRecords(flowFile, objectType, context, session);
+            session.transfer(flowFile, REL_SUCCESS);
+            long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+            session.getProvenanceReporter().send(flowFile, 
salesforceRestClient.getVersionedBaseUrl() + "/put/" + objectType, 
transferMillis);
+        } catch (MalformedRecordException e) {
+            getLogger().error("Couldn't read records from input", e);
+            transferToFailure(session, flowFile, e);
+        } catch (SchemaNotFoundException e) {
+            getLogger().error("Couldn't create record writer", e);
+            transferToFailure(session, flowFile, e);
+        } catch (Exception e) {
+            getLogger().error("Failed to put records to Salesforce.", e);
+            transferToFailure(session, flowFile, e);
+        }
+    }
+
+    private void processRecords(FlowFile flowFile, String objectType, 
ProcessContext context, ProcessSession session) throws IOException, 
MalformedRecordException, SchemaNotFoundException {
         RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
+        int count = 0;
+        RecordExtender recordExtender;
 
-        RecordExtender extender;
-        long startNanos = System.nanoTime();
-        try {
         try (InputStream in = session.read(flowFile);
              RecordReader reader = readerFactory.createRecordReader(flowFile, 
in, getLogger());
              ByteArrayOutputStream out = new ByteArrayOutputStream();
-             WriteJsonResult writer = getWriter(extender = new 
RecordExtender(reader.getSchema()), out)) {
+             WriteJsonResult writer = getWriter(recordExtender = 
getExtender(reader), out)) {
 
-            int count = 0;
             Record record;
-
             while ((record = reader.nextRecord()) != null) {
                 count++;
                 if (!writer.isActiveRecordSet()) {
                     writer.beginRecordSet();
                 }
 
-                MapRecord extendedRecord = 
extender.getExtendedRecord(objectType, count, record);
+                MapRecord extendedRecord = 
recordExtender.getExtendedRecord(objectType, count, record);
                 writer.write(extendedRecord);
 
                 if (count == maxRecordCount) {
                     count = 0;
-                    processRecords(objectType, out, writer, extender);
+                    postRecordBatch(objectType, out, writer, recordExtender);
                     out.reset();
                 }
             }
             if (writer.isActiveRecordSet()) {
-                processRecords(objectType, out, writer, extender);
+                postRecordBatch(objectType, out, writer, recordExtender);
             }
-          }
-          session.transfer(flowFile, REL_SUCCESS);
-          long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-          session.getProvenanceReporter().send(flowFile, 
salesforceRestService.getVersionedBaseUrl()+ "/composite/tree/" + objectType, 
transferMillis);
-        } catch (MalformedRecordException e) {
-            getLogger().error("Couldn't read records from input", e);
-            transferToFailure(session, flowFile, e);
-        } catch (SchemaNotFoundException e) {
-            getLogger().error("Couldn't create record writer", e);
-            transferToFailure(session, flowFile, e);
-        } catch (Exception e) {
-            getLogger().error("Failed to put records to Salesforce.", e);
-            transferToFailure(session, flowFile, e);
         }
     }
 
+    private SalesforceConfiguration 
createSalesforceConfiguration(ProcessContext context) {
+        String salesforceVersion = context.getProperty(API_VERSION).getValue();
+        String instanceUrl = 
context.getProperty(SALESFORCE_INSTANCE_URL).getValue();
+        OAuth2AccessTokenProvider accessTokenProvider =
+                
context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+        return SalesforceConfiguration.create(instanceUrl, salesforceVersion,
+                () -> accessTokenProvider.getAccessDetails().getAccessToken(), 
0);
+    }
+
+    private void handleInvalidFlowFile(ProcessSession session, FlowFile 
flowFile) {

Review Comment:
   The handleInvalidFlowFile and transferToFailure methods are quite similar.
   You can change transferToFailure a bit and reuse it in handleInvalidFlowFile.
   (The transferToFailure uses the message from the exception parameter only.)
   
    private void transferToFailure(ProcessSession session, FlowFile flowFile, 
String message) {
           flowFile = session.putAttribute(flowFile, ATTR_ERROR_MESSAGE, 
message);
           session.transfer(session.penalize(flowFile), REL_FAILURE);
       }
   
    private void handleInvalidFlowFile(ProcessSession session, FlowFile 
flowFile) {
           getLogger().error("Salesforce object type not found among the 
incoming FlowFile attributes");
           transferToFailure(session, flowFile, "Salesforce object type not 
found among FlowFile attributes");
       }
   
       
       
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to