krisztina-zsihovszki commented on code in PR #7019: URL: https://github.com/apache/nifi/pull/7019#discussion_r1138356484
########## nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java: ########## @@ -385,144 +354,181 @@ private void processQuery(ProcessContext context, ProcessSession session) { .collect(Collectors.joining(",")); } - String querySObject = buildQuery( - sObject, - fields, - customWhereClause, - ageField, - initialAgeFilter, - ageFilterLower, - ageFilterUpper - ); + String querySObject = new SalesforceQueryBuilder(incrementalContext) + .buildQuery(sObject, fields, customWhereClause); + + AtomicBoolean isOriginalTransferred = new AtomicBoolean(false); + List<FlowFile> outgoingFlowFiles = new ArrayList<>(); + Map<String, String> originalAttributes = Optional.ofNullable(originalFlowFile) + .map(FlowFile::getAttributes) + .orElseGet(HashMap::new); + + long startNanos = System.nanoTime(); do { - FlowFile flowFile = session.create(); - Map<String, String> originalAttributes = flowFile.getAttributes(); - Map<String, String> attributes = new HashMap<>(); + FlowFile outgoingFlowFile = createOutgoingFlowFile(session, originalFlowFile); + outgoingFlowFiles.add(outgoingFlowFile); + Map<String, String> attributes = new HashMap<>(originalAttributes); AtomicInteger recordCountHolder = new AtomicInteger(); - long startNanos = System.nanoTime(); - flowFile = session.write(flowFile, out -> { - try ( - InputStream querySObjectResultInputStream = getResultInputStream(nextRecordsUrl.get(), querySObject); - - JsonTreeRowRecordReader jsonReader = new JsonTreeRowRecordReader( - querySObjectResultInputStream, - getLogger(), - salesForceSchemaHolder.recordSchema, - DATE_FORMAT, - TIME_FORMAT, - DATE_TIME_FORMAT, - StartingFieldStrategy.NESTED_FIELD, - STARTING_FIELD_NAME, - SchemaApplicationStrategy.SELECTED_PART, - CAPTURE_PREDICATE - ); - - RecordSetWriter writer = writerFactory.createWriter( - getLogger(), - writerFactory.getSchema( - originalAttributes, - salesForceSchemaHolder.recordSchema - ), - out, - originalAttributes - ) - ) { - writer.beginRecordSet(); - - Record querySObjectRecord; - while ((querySObjectRecord = jsonReader.nextRecord()) != null) { - writer.write(querySObjectRecord); - } - - WriteResult writeResult = writer.finishRecordSet(); + try { + outgoingFlowFile = session.write(outgoingFlowFile, processRecordsCallback(context, nextRecordsUrl, writerFactory, state, incrementalContext, + salesForceSchemaHolder, querySObject, originalAttributes, attributes, recordCountHolder)); + int recordCount = recordCountHolder.get(); - Map<String, String> capturedFields = jsonReader.getCapturedFields(); + if (createZeroRecordFlowFiles || recordCount != 0) { + outgoingFlowFile = session.putAllAttributes(outgoingFlowFile, attributes); - nextRecordsUrl.set(capturedFields.getOrDefault(NEXT_RECORDS_URL, null)); + session.adjustCounter("Records Processed", recordCount, false); + getLogger().info("Successfully written {} records for {}", recordCount, outgoingFlowFile); + } else { + outgoingFlowFiles.remove(outgoingFlowFile); + session.remove(outgoingFlowFile); + } + } catch (Exception e) { + if (e.getCause() instanceof IOException) { + throw new ProcessException("Couldn't get Salesforce records", e); + } else if (e.getCause() instanceof SchemaNotFoundException) { + handleError(session, originalFlowFile, isOriginalTransferred, outgoingFlowFiles, e, "Couldn't create record writer"); + } else if (e.getCause() instanceof MalformedRecordException) { + handleError(session, originalFlowFile, isOriginalTransferred, outgoingFlowFiles, e, "Couldn't read records from input"); + } else { + handleError(session, originalFlowFile, isOriginalTransferred, outgoingFlowFiles, e, "Couldn't get Salesforce records"); + } + break; + } + } while (nextRecordsUrl.get() != null); - attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); - attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); - attributes.putAll(writeResult.getAttributes()); + transferFlowFiles(session, outgoingFlowFiles, originalFlowFile, isOriginalTransferred, startNanos, sObject); + } - recordCountHolder.set(writeResult.getRecordCount()); + private OutputStreamCallback processRecordsCallback(ProcessContext context, AtomicReference<String> nextRecordsUrl, RecordSetWriterFactory writerFactory, + StateMap state, IncrementalContext incrementalContext, SalesforceSchemaHolder salesForceSchemaHolder, + String querySObject, Map<String, String> originalAttributes, Map<String, String> attributes, + AtomicInteger recordCountHolder) { + return out -> { + try { + handleRecordSet(out, nextRecordsUrl, querySObject, writerFactory, salesForceSchemaHolder, originalAttributes, attributes, recordCountHolder); - if (ageFilterUpper != null) { - Map<String, String> newState = new HashMap<>(state.toMap()); - newState.put(LAST_AGE_FILTER, ageFilterUpper); - updateState(context, newState); - } - } catch (SchemaNotFoundException e) { - throw new ProcessException("Couldn't create record writer", e); - } catch (MalformedRecordException e) { - throw new ProcessException("Couldn't read records from input", e); + if (incrementalContext.getAgeFilterUpper() != null) { + Map<String, String> newState = new HashMap<>(state.toMap()); + newState.put(LAST_AGE_FILTER, incrementalContext.getAgeFilterUpper()); + updateState(context, newState); } - }); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + } - int recordCount = recordCountHolder.get(); + private void handleRecordSet(OutputStream out, AtomicReference<String> nextRecordsUrl, String querySObject, RecordSetWriterFactory writerFactory, + SalesforceSchemaHolder salesForceSchemaHolder, Map<String, String> originalAttributes, Map<String, String> attributes, + AtomicInteger recordCountHolder) throws Exception { + try ( + InputStream querySObjectResultInputStream = getResultInputStream(nextRecordsUrl.get(), querySObject); + JsonTreeRowRecordReader jsonReader = createJsonReader(querySObjectResultInputStream, salesForceSchemaHolder.getRecordSchema()); + RecordSetWriter writer = createRecordSetWriter(writerFactory, originalAttributes, out, salesForceSchemaHolder.getRecordSchema()) + ) { + writer.beginRecordSet(); + + Record querySObjectRecord; + while ((querySObjectRecord = jsonReader.nextRecord()) != null) { + writer.write(querySObjectRecord); + } - if (!createZeroRecordFlowFiles && recordCount == 0) { - session.remove(flowFile); - } else { - flowFile = session.putAllAttributes(flowFile, attributes); - session.transfer(flowFile, REL_SUCCESS); + WriteResult writeResult = writer.finishRecordSet(); - long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); - session.getProvenanceReporter().receive(flowFile, salesforceRestService.getVersionedBaseUrl() + "/composite/tree/" + sObject, - transferMillis); + Map<String, String> capturedFields = jsonReader.getCapturedFields(); + nextRecordsUrl.set(capturedFields.getOrDefault(NEXT_RECORDS_URL, null)); - session.adjustCounter("Records Processed", recordCount, false); - getLogger().info("Successfully written {} records for {}", recordCount, flowFile); - } - } while (nextRecordsUrl.get() != null); + attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + attributes.putAll(writeResult.getAttributes()); + recordCountHolder.set(writeResult.getRecordCount()); + } + } + + private JsonTreeRowRecordReader createJsonReader(InputStream querySObjectResultInputStream, RecordSchema recordSchema) throws IOException, MalformedRecordException { + return new JsonTreeRowRecordReader( + querySObjectResultInputStream, + getLogger(), + recordSchema, + DATE_FORMAT, + TIME_FORMAT, + DATE_TIME_FORMAT, + StartingFieldStrategy.NESTED_FIELD, + STARTING_FIELD_NAME, + SchemaApplicationStrategy.SELECTED_PART, + CAPTURE_PREDICATE + ); + } + + private RecordSetWriter createRecordSetWriter(RecordSetWriterFactory writerFactory, Map<String, String> originalAttributes, OutputStream out, + RecordSchema recordSchema) throws IOException, SchemaNotFoundException { + return writerFactory.createWriter( + getLogger(), + writerFactory.getSchema( + originalAttributes, + recordSchema + ), + out, + originalAttributes + ); } private void processCustomQuery(ProcessContext context, ProcessSession session, FlowFile originalFlowFile) { String customQuery = context.getProperty(CUSTOM_SOQL_QUERY).evaluateAttributeExpressions(originalFlowFile).getValue(); AtomicReference<String> nextRecordsUrl = new AtomicReference<>(); AtomicReference<String> totalSize = new AtomicReference<>(); - boolean isOriginalTransferred = false; + AtomicBoolean isOriginalTransferred = new AtomicBoolean(false); List<FlowFile> outgoingFlowFiles = new ArrayList<>(); + long startNanos = System.nanoTime(); do { - FlowFile outgoingFlowFile; try (InputStream response = getResultInputStream(nextRecordsUrl.get(), customQuery)) { - if (originalFlowFile != null) { - outgoingFlowFile = session.create(originalFlowFile); - } else { - outgoingFlowFile = session.create(); - } + FlowFile outgoingFlowFile = createOutgoingFlowFile(session, originalFlowFile); outgoingFlowFiles.add(outgoingFlowFile); - outgoingFlowFile = session.write(outgoingFlowFile, parseHttpResponse(response, nextRecordsUrl, totalSize)); - int recordCount = nextRecordsUrl.get() != null ? 2000 : Integer.parseInt(totalSize.get()) % 2000; + outgoingFlowFile = session.write(outgoingFlowFile, parseCustomQueryResponse(response, nextRecordsUrl, totalSize)); + int recordCount = nextRecordsUrl.get() != null ? MAX_RECORD_COUNT : Integer.parseInt(totalSize.get()) % MAX_RECORD_COUNT; Map<String, String> attributes = new HashMap<>(); attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); - attributes.put(TOTAL_RECORD_COUNT, String.valueOf(recordCount)); + attributes.put(TOTAL_RECORD_COUNT_ATTRIBUTE, String.valueOf(recordCount)); session.adjustCounter("Salesforce records processed", recordCount, false); session.putAllAttributes(outgoingFlowFile, attributes); } catch (IOException e) { throw new ProcessException("Couldn't get Salesforce records", e); } catch (Exception e) { - if (originalFlowFile != null) { - session.transfer(originalFlowFile, REL_FAILURE); - isOriginalTransferred = true; - } - getLogger().error("Couldn't get Salesforce records", e); - session.remove(outgoingFlowFiles); - outgoingFlowFiles.clear(); + handleError(session, originalFlowFile, isOriginalTransferred, outgoingFlowFiles, e, "Couldn't get Salesforce records"); break; } } while (nextRecordsUrl.get() != null); + transferFlowFiles(session, outgoingFlowFiles, originalFlowFile, isOriginalTransferred, startNanos, "custom"); + } + + private void transferFlowFiles(ProcessSession session, List<FlowFile> outgoingFlowFiles, FlowFile originalFlowFile, AtomicBoolean isOriginalTransferred, + long startNanos, String urlDetail) { if (!outgoingFlowFiles.isEmpty()) { session.transfer(outgoingFlowFiles, REL_SUCCESS); + long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + + outgoingFlowFiles.forEach(ff -> + session.getProvenanceReporter().receive(ff, salesforceRestService.getVersionedBaseUrl() + urlDetail, transferMillis) Review Comment: Wrong URi is created for custom query, a / is missing: "https://xxx.my.salesforce.com/services/data/v54.0custom" ########## nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java: ########## @@ -146,69 +136,85 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro String objectType = flowFile.getAttribute(ATTR_OBJECT_TYPE); if (objectType == null) { - getLogger().error("Salesforce object type not found among the incoming FlowFile attributes"); - flowFile = session.putAttribute(flowFile, ATTR_ERROR_MESSAGE, "Salesforce object type not found among FlowFile attributes"); - session.transfer(session.penalize(flowFile), REL_FAILURE); + handleInvalidFlowFile(session, flowFile); return; } + try { + long startNanos = System.nanoTime(); + processRecords(flowFile, objectType, context, session); + session.transfer(flowFile, REL_SUCCESS); + long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().send(flowFile, salesforceRestClient.getVersionedBaseUrl() + "/put/" + objectType, transferMillis); + } catch (MalformedRecordException e) { + getLogger().error("Couldn't read records from input", e); + transferToFailure(session, flowFile, e); + } catch (SchemaNotFoundException e) { + getLogger().error("Couldn't create record writer", e); + transferToFailure(session, flowFile, e); + } catch (Exception e) { + getLogger().error("Failed to put records to Salesforce.", e); + transferToFailure(session, flowFile, e); + } + } + + private void processRecords(FlowFile flowFile, String objectType, ProcessContext context, ProcessSession session) throws IOException, MalformedRecordException, SchemaNotFoundException { RecordReaderFactory readerFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class); + int count = 0; + RecordExtender recordExtender; - RecordExtender extender; - long startNanos = System.nanoTime(); - try { try (InputStream in = session.read(flowFile); RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger()); ByteArrayOutputStream out = new ByteArrayOutputStream(); - WriteJsonResult writer = getWriter(extender = new RecordExtender(reader.getSchema()), out)) { + WriteJsonResult writer = getWriter(recordExtender = getExtender(reader), out)) { - int count = 0; Record record; - while ((record = reader.nextRecord()) != null) { count++; if (!writer.isActiveRecordSet()) { writer.beginRecordSet(); } - MapRecord extendedRecord = extender.getExtendedRecord(objectType, count, record); + MapRecord extendedRecord = recordExtender.getExtendedRecord(objectType, count, record); writer.write(extendedRecord); if (count == maxRecordCount) { count = 0; - processRecords(objectType, out, writer, extender); + postRecordBatch(objectType, out, writer, recordExtender); out.reset(); } } if (writer.isActiveRecordSet()) { - processRecords(objectType, out, writer, extender); + postRecordBatch(objectType, out, writer, recordExtender); } - } - session.transfer(flowFile, REL_SUCCESS); - long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); - session.getProvenanceReporter().send(flowFile, salesforceRestService.getVersionedBaseUrl()+ "/composite/tree/" + objectType, transferMillis); - } catch (MalformedRecordException e) { - getLogger().error("Couldn't read records from input", e); - transferToFailure(session, flowFile, e); - } catch (SchemaNotFoundException e) { - getLogger().error("Couldn't create record writer", e); - transferToFailure(session, flowFile, e); - } catch (Exception e) { - getLogger().error("Failed to put records to Salesforce.", e); - transferToFailure(session, flowFile, e); } } + private SalesforceConfiguration createSalesforceConfiguration(ProcessContext context) { + String salesforceVersion = context.getProperty(API_VERSION).getValue(); + String instanceUrl = context.getProperty(SALESFORCE_INSTANCE_URL).getValue(); + OAuth2AccessTokenProvider accessTokenProvider = + context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class); + return SalesforceConfiguration.create(instanceUrl, salesforceVersion, + () -> accessTokenProvider.getAccessDetails().getAccessToken(), 0); + } + + private void handleInvalidFlowFile(ProcessSession session, FlowFile flowFile) { Review Comment: The handleInvalidFlowFile and transferToFailure methods are quite similar. You can change transferToFailure a bit and reuse it in handleInvalidFlowFile. (The transferToFailure uses the message from the exception parameter only.) private void transferToFailure(ProcessSession session, FlowFile flowFile, String message) { flowFile = session.putAttribute(flowFile, ATTR_ERROR_MESSAGE, message); session.transfer(session.penalize(flowFile), REL_FAILURE); } private void handleInvalidFlowFile(ProcessSession session, FlowFile flowFile) { getLogger().error("Salesforce object type not found among the incoming FlowFile attributes"); transferToFailure(session, flowFile, "Salesforce object type not found among FlowFile attributes"); } -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org