This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new d52e097dec NIFI-11224: Refactor and FF attribute support in WHERE in
QuerySalesforceObject.
d52e097dec is described below
commit d52e097decfb2662a34733ba11bddbf5e440d65e
Author: Pierre Villard <[email protected]>
AuthorDate: Mon Feb 27 13:47:06 2023 +0100
NIFI-11224: Refactor and FF attribute support in WHERE in
QuerySalesforceObject.
This closes #7019.
Co-authored-by: Lehel Boér <[email protected]>
Signed-off-by: Tamas Palfy <[email protected]>
---
.../processors/salesforce/PutSalesforceObject.java | 102 ++---
.../salesforce/QuerySalesforceObject.java | 450 +++++++++++----------
.../salesforce/rest/SalesforceConfiguration.java | 54 +++
.../salesforce/rest/SalesforceRestClient.java | 112 +++++
.../salesforce/schema/SalesforceSchemaHolder.java | 45 +++
.../schema/SalesforceToRecordSchemaConverter.java | 148 +++++++
.../util/CommonSalesforceProperties.java | 6 +-
.../salesforce/util/IncrementalContext.java | 67 +++
.../salesforce/util/SalesforceQueryBuilder.java | 66 +++
.../salesforce/util/SalesforceRestService.java | 134 ------
.../util/SalesforceToRecordSchemaConverter.java | 124 ------
.../validator/SalesforceAgeValidator.java | 44 ++
.../salesforce/PutSalesforceObjectIT.java | 10 +-
.../salesforce/QuerySalesforceObjectIT.java | 6 +-
.../salesforce/util/SalesforceConfigAware.java | 4 +-
.../salesforce/util/SalesforceRestServiceIT.java | 14 +-
.../SalesforceToRecordSchemaConverterTest.java | 6 +-
17 files changed, 841 insertions(+), 551 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java
index 83605800f9..210ceafedd 100644
---
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java
+++
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java
@@ -35,8 +35,9 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.salesforce.rest.SalesforceConfiguration;
+import org.apache.nifi.processors.salesforce.rest.SalesforceRestClient;
import org.apache.nifi.processors.salesforce.util.RecordExtender;
-import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
import org.apache.nifi.schema.access.NopSchemaAccessWriter;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
@@ -56,9 +57,9 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import static
org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_URL;
import static
org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_VERSION;
import static
org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.READ_TIMEOUT;
+import static
org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.SALESFORCE_INSTANCE_URL;
import static
org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.TOKEN_PROVIDER;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@@ -94,7 +95,7 @@ public class PutSalesforceObject extends AbstractProcessor {
.build();
private static final List<PropertyDescriptor> PROPERTIES =
Collections.unmodifiableList(Arrays.asList(
- API_URL,
+ SALESFORCE_INSTANCE_URL,
API_VERSION,
READ_TIMEOUT,
TOKEN_PROVIDER,
@@ -106,7 +107,7 @@ public class PutSalesforceObject extends AbstractProcessor {
REL_FAILURE
)));
- private volatile SalesforceRestService salesforceRestService;
+ private volatile SalesforceRestClient salesforceRestClient;
private volatile int maxRecordCount;
@Override
@@ -122,19 +123,8 @@ public class PutSalesforceObject extends AbstractProcessor
{
@OnScheduled
public void onScheduled(final ProcessContext context) {
maxRecordCount = getMaxRecordCount();
-
- String salesforceVersion = context.getProperty(API_VERSION).getValue();
- String baseUrl = context.getProperty(API_URL).getValue();
- OAuth2AccessTokenProvider accessTokenProvider =
-
context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
-
- salesforceRestService = new SalesforceRestService(
- salesforceVersion,
- baseUrl,
- () -> accessTokenProvider.getAccessDetails().getAccessToken(),
-
context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS)
- .intValue()
- );
+ SalesforceConfiguration configuration =
createSalesforceConfiguration(context);
+ salesforceRestClient = new SalesforceRestClient(configuration);
}
@Override
@@ -146,69 +136,85 @@ public class PutSalesforceObject extends
AbstractProcessor {
String objectType = flowFile.getAttribute(ATTR_OBJECT_TYPE);
if (objectType == null) {
- getLogger().error("Salesforce object type not found among the
incoming FlowFile attributes");
- flowFile = session.putAttribute(flowFile, ATTR_ERROR_MESSAGE,
"Salesforce object type not found among FlowFile attributes");
- session.transfer(session.penalize(flowFile), REL_FAILURE);
+ handleInvalidFlowFile(session, flowFile);
return;
}
+ try {
+ long startNanos = System.nanoTime();
+ processRecords(flowFile, objectType, context, session);
+ session.transfer(flowFile, REL_SUCCESS);
+ long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ session.getProvenanceReporter().send(flowFile,
salesforceRestClient.getVersionedBaseUrl() + "/put/" + objectType,
transferMillis);
+ } catch (MalformedRecordException e) {
+ getLogger().error("Couldn't read records from input", e);
+ transferToFailure(session, flowFile, e.getMessage());
+ } catch (SchemaNotFoundException e) {
+ getLogger().error("Couldn't create record writer", e);
+ transferToFailure(session, flowFile, e.getMessage());
+ } catch (Exception e) {
+ getLogger().error("Failed to put records to Salesforce.", e);
+ transferToFailure(session, flowFile, e.getMessage());
+ }
+ }
+
+ private void processRecords(FlowFile flowFile, String objectType,
ProcessContext context, ProcessSession session) throws IOException,
MalformedRecordException, SchemaNotFoundException {
RecordReaderFactory readerFactory =
context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
+ int count = 0;
+ RecordExtender recordExtender;
- RecordExtender extender;
- long startNanos = System.nanoTime();
- try {
try (InputStream in = session.read(flowFile);
RecordReader reader = readerFactory.createRecordReader(flowFile,
in, getLogger());
ByteArrayOutputStream out = new ByteArrayOutputStream();
- WriteJsonResult writer = getWriter(extender = new
RecordExtender(reader.getSchema()), out)) {
+ WriteJsonResult writer = getWriter(recordExtender =
getExtender(reader), out)) {
- int count = 0;
Record record;
-
while ((record = reader.nextRecord()) != null) {
count++;
if (!writer.isActiveRecordSet()) {
writer.beginRecordSet();
}
- MapRecord extendedRecord =
extender.getExtendedRecord(objectType, count, record);
+ MapRecord extendedRecord =
recordExtender.getExtendedRecord(objectType, count, record);
writer.write(extendedRecord);
if (count == maxRecordCount) {
count = 0;
- processRecords(objectType, out, writer, extender);
+ postRecordBatch(objectType, out, writer, recordExtender);
out.reset();
}
}
if (writer.isActiveRecordSet()) {
- processRecords(objectType, out, writer, extender);
+ postRecordBatch(objectType, out, writer, recordExtender);
}
- }
- session.transfer(flowFile, REL_SUCCESS);
- long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- session.getProvenanceReporter().send(flowFile,
salesforceRestService.getVersionedBaseUrl()+ "/composite/tree/" + objectType,
transferMillis);
- } catch (MalformedRecordException e) {
- getLogger().error("Couldn't read records from input", e);
- transferToFailure(session, flowFile, e);
- } catch (SchemaNotFoundException e) {
- getLogger().error("Couldn't create record writer", e);
- transferToFailure(session, flowFile, e);
- } catch (Exception e) {
- getLogger().error("Failed to put records to Salesforce.", e);
- transferToFailure(session, flowFile, e);
}
}
- private void transferToFailure(ProcessSession session, FlowFile flowFile,
Exception e) {
- flowFile = session.putAttribute(flowFile, ATTR_ERROR_MESSAGE,
e.getMessage());
+ private SalesforceConfiguration
createSalesforceConfiguration(ProcessContext context) {
+ String salesforceVersion = context.getProperty(API_VERSION).getValue();
+ String instanceUrl =
context.getProperty(SALESFORCE_INSTANCE_URL).getValue();
+ OAuth2AccessTokenProvider accessTokenProvider =
+
context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+ return SalesforceConfiguration.create(instanceUrl, salesforceVersion,
+ () -> accessTokenProvider.getAccessDetails().getAccessToken(),
0);
+ }
+
+ private void handleInvalidFlowFile(ProcessSession session, FlowFile
flowFile) {
+ String errorMessage = "Salesforce object type not found among the
incoming FlowFile attributes";
+ getLogger().error(errorMessage);
+ transferToFailure(session, flowFile, errorMessage);
+ }
+
+ private void transferToFailure(ProcessSession session, FlowFile flowFile,
String message) {
+ flowFile = session.putAttribute(flowFile, ATTR_ERROR_MESSAGE, message);
session.transfer(session.penalize(flowFile), REL_FAILURE);
}
- private void processRecords(String objectType, ByteArrayOutputStream out,
WriteJsonResult writer, RecordExtender extender) throws IOException {
+ private void postRecordBatch(String objectType, ByteArrayOutputStream out,
WriteJsonResult writer, RecordExtender extender) throws IOException {
writer.finishRecordSet();
writer.flush();
ObjectNode wrappedJson = extender.getWrappedRecordsJson(out);
- salesforceRestService.postRecord(objectType,
wrappedJson.toPrettyString());
+ salesforceRestClient.postRecord(objectType,
wrappedJson.toPrettyString());
}
private WriteJsonResult getWriter(RecordExtender extender,
ByteArrayOutputStream out) throws IOException {
@@ -217,6 +223,10 @@ public class PutSalesforceObject extends AbstractProcessor
{
true, NullSuppression.NEVER_SUPPRESS,
OutputGrouping.OUTPUT_ARRAY, null, null, null);
}
+ private RecordExtender getExtender(RecordReader reader) throws
MalformedRecordException {
+ return new RecordExtender(reader.getSchema());
+ }
+
int getMaxRecordCount() {
return MAX_RECORD_COUNT;
}
diff --git
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
index 77a82f0b03..5387340511 100644
---
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
+++
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
@@ -16,14 +16,14 @@
*/
package org.apache.nifi.processors.salesforce;
-import org.apache.camel.component.salesforce.api.dto.SObjectDescription;
-import org.apache.camel.component.salesforce.api.dto.SObjectField;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.component.salesforce.api.dto.SObjectDescription;
+import org.apache.camel.component.salesforce.api.dto.SObjectField;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Stateful;
@@ -55,8 +55,13 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
-import
org.apache.nifi.processors.salesforce.util.SalesforceToRecordSchemaConverter;
+import org.apache.nifi.processors.salesforce.rest.SalesforceConfiguration;
+import org.apache.nifi.processors.salesforce.rest.SalesforceRestClient;
+import org.apache.nifi.processors.salesforce.schema.SalesforceSchemaHolder;
+import
org.apache.nifi.processors.salesforce.schema.SalesforceToRecordSchemaConverter;
+import org.apache.nifi.processors.salesforce.util.IncrementalContext;
+import org.apache.nifi.processors.salesforce.util.SalesforceQueryBuilder;
+import org.apache.nifi.processors.salesforce.validator.SalesforceAgeValidator;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
@@ -72,10 +77,8 @@ import org.apache.nifi.util.StringUtils;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.io.UncheckedIOException;
-import java.time.OffsetDateTime;
-import java.time.format.DateTimeFormatter;
-import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -84,16 +87,18 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
-import static
org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_URL;
import static
org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_VERSION;
import static
org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.READ_TIMEOUT;
+import static
org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.SALESFORCE_INSTANCE_URL;
import static
org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.TOKEN_PROVIDER;
@TriggerSerially
@@ -102,18 +107,19 @@ import static
org.apache.nifi.processors.salesforce.util.CommonSalesforcePropert
@CapabilityDescription("Retrieves records from a Salesforce sObject. Users can
add arbitrary filter conditions by setting the 'Custom WHERE Condition'
property."
+ " The processor can also run a custom query, although record
processing is not supported in that case."
+ " Supports incremental retrieval: users can define a field in the
'Age Field' property that will be used to determine when the record was
created."
- + " When this property is set the processor will retrieve new records.
It's also possible to define an initial cutoff value for the age, filtering out
all older records"
+ + " When this property is set the processor will retrieve new records.
Incremental loading and record-based processing are only supported in
property-based queries."
+ + " It's also possible to define an initial cutoff value for the age,
filtering out all older records"
+ " even for the first run. In case of 'Property Based Query' this
processor should run on the Primary Node only."
+ " FlowFile attribute 'record.count' indicates how many records were
retrieved and written to the output."
- + " By using 'Custom Query', the processor can accept an optional
input flowfile and reference the flowfile attributes in the query."
- + " However, incremental loading and record-based processing are not
supported in this scenario.")
+ + " The processor can accept an optional input FlowFile and reference
the FlowFile attributes in the query.")
@Stateful(scopes = Scope.CLUSTER, description = "When 'Age Field' is set,
after performing a query the time of execution is stored. Subsequent queries
will be augmented"
+ " with an additional condition so that only records that are newer
than the stored execution time (adjusted with the optional value of 'Age
Delay') will be retrieved."
+ " State is stored across the cluster so that this Processor can be
run on Primary Node only and if a new Primary Node is selected,"
+ " the new node can pick up where the previous node left off, without
duplicating the data.")
@WritesAttributes({
@WritesAttribute(attribute = "mime.type", description = "Sets the
mime.type attribute to the MIME Type specified by the Record Writer."),
- @WritesAttribute(attribute = "record.count", description = "Sets the
number of records in the FlowFile.")
+ @WritesAttribute(attribute = "record.count", description = "Sets the
number of records in the FlowFile."),
+ @WritesAttribute(attribute = "total.record.count", description = "Sets
the total number of records in the FlowFile.")
})
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
@SeeAlso(PutSalesforceObject.class)
@@ -181,7 +187,7 @@ public class QuerySalesforceObject extends
AbstractProcessor {
.dependsOn(QUERY_TYPE, PROPERTY_BASED_QUERY)
.build();
- static final PropertyDescriptor AGE_FIELD = new
PropertyDescriptor.Builder()
+ public static final PropertyDescriptor AGE_FIELD = new
PropertyDescriptor.Builder()
.name("age-field")
.displayName("Age Field")
.description("The name of a TIMESTAMP field that will be used to
filter records using a bounded time window."
@@ -193,7 +199,7 @@ public class QuerySalesforceObject extends
AbstractProcessor {
.dependsOn(QUERY_TYPE, PROPERTY_BASED_QUERY)
.build();
- static final PropertyDescriptor AGE_DELAY = new
PropertyDescriptor.Builder()
+ public static final PropertyDescriptor AGE_DELAY = new
PropertyDescriptor.Builder()
.name("age-delay")
.displayName("Age Delay")
.description("The ending timestamp of the time window will be
adjusted earlier by the amount configured in this property." +
@@ -205,7 +211,7 @@ public class QuerySalesforceObject extends
AbstractProcessor {
.dependsOn(QUERY_TYPE, PROPERTY_BASED_QUERY)
.build();
- static final PropertyDescriptor INITIAL_AGE_FILTER = new
PropertyDescriptor.Builder()
+ public static final PropertyDescriptor INITIAL_AGE_FILTER = new
PropertyDescriptor.Builder()
.name("initial-age-filter")
.displayName("Initial Age Start Time")
.description("This property specifies the start time that the
processor applies when running the first query.")
@@ -243,7 +249,7 @@ public class QuerySalesforceObject extends
AbstractProcessor {
.autoTerminateDefault(true)
.build();
- private static final String LAST_AGE_FILTER = "last_age_filter";
+ public static final String LAST_AGE_FILTER = "last_age_filter";
private static final String STARTING_FIELD_NAME = "records";
private static final String DATE_FORMAT = "yyyy-MM-dd";
private static final String TIME_FORMAT = "HH:mm:ss.SSSX";
@@ -254,13 +260,20 @@ public class QuerySalesforceObject extends
AbstractProcessor {
private static final BiPredicate<String, String> CAPTURE_PREDICATE =
(fieldName, fieldValue) -> NEXT_RECORDS_URL.equals(fieldName);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
- private static final String TOTAL_RECORD_COUNT = "total.record.count";
+ private static final String TOTAL_RECORD_COUNT_ATTRIBUTE =
"total.record.count";
+ private static final int MAX_RECORD_COUNT = 2000;
private volatile SalesforceToRecordSchemaConverter
salesForceToRecordSchemaConverter;
- private volatile SalesforceRestService salesforceRestService;
+ private volatile SalesforceRestClient salesforceRestService;
+ private volatile boolean resetState = false;
@OnScheduled
public void onScheduled(ProcessContext context) {
+ if (resetState) {
+ clearState(context);
+ resetState = false;
+ }
+
salesForceToRecordSchemaConverter = new
SalesforceToRecordSchemaConverter(
DATE_FORMAT,
DATE_TIME_FORMAT,
@@ -268,19 +281,21 @@ public class QuerySalesforceObject extends
AbstractProcessor {
);
String salesforceVersion = context.getProperty(API_VERSION).getValue();
- String baseUrl = context.getProperty(API_URL).getValue();
+ String instanceUrl =
context.getProperty(SALESFORCE_INSTANCE_URL).getValue();
OAuth2AccessTokenProvider accessTokenProvider =
context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
- salesforceRestService = new SalesforceRestService(
+ SalesforceConfiguration salesforceConfiguration =
SalesforceConfiguration.create(
+ instanceUrl,
salesforceVersion,
- baseUrl,
() -> accessTokenProvider.getAccessDetails().getAccessToken(),
context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue()
);
+
+ salesforceRestService = new
SalesforceRestClient(salesforceConfiguration);
}
private static final List<PropertyDescriptor> PROPERTIES =
Collections.unmodifiableList(Arrays.asList(
- API_URL,
+ SALESFORCE_INSTANCE_URL,
API_VERSION,
QUERY_TYPE,
CUSTOM_SOQL_QUERY,
@@ -313,69 +328,46 @@ public class QuerySalesforceObject extends
AbstractProcessor {
@Override
protected Collection<ValidationResult> customValidate(ValidationContext
validationContext) {
List<ValidationResult> results = new
ArrayList<>(super.customValidate(validationContext));
- if (validationContext.getProperty(INITIAL_AGE_FILTER).isSet() &&
!validationContext.getProperty(AGE_FIELD).isSet()) {
- results.add(
- new ValidationResult.Builder()
- .subject(INITIAL_AGE_FILTER.getDisplayName())
- .valid(false)
- .explanation("it requires " +
AGE_FIELD.getDisplayName() + " also to be set.")
- .build()
- );
+ return SalesforceAgeValidator.validate(validationContext, results);
+ }
+
+ @Override
+ public void onPropertyModified(PropertyDescriptor descriptor, String
oldValue, String newValue) {
+ if ((oldValue != null && !oldValue.equals(newValue))
+ && (descriptor.equals(SALESFORCE_INSTANCE_URL)
+ || descriptor.equals(QUERY_TYPE)
+ || descriptor.equals(SOBJECT_NAME)
+ || descriptor.equals(AGE_FIELD)
+ || descriptor.equals(INITIAL_AGE_FILTER)
+ || descriptor.equals(CUSTOM_WHERE_CONDITION))
+ ) {
+ getLogger().debug("A property that require resetting state was
modified - {} oldValue {} newValue {}",
+ descriptor.getDisplayName(), oldValue, newValue);
+ resetState = true;
}
- return results;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
boolean isCustomQuery =
CUSTOM_QUERY.getValue().equals(context.getProperty(QUERY_TYPE).getValue());
-
-
+ FlowFile flowFile = session.get();
if (isCustomQuery) {
- FlowFile flowFile = session.get();
- if (flowFile == null && context.hasIncomingConnection()) {
- context.yield();
- return;
- }
processCustomQuery(context, session, flowFile);
- return;
+ } else {
+ processQuery(context, session, flowFile);
}
- processQuery(context, session);
}
- private void processQuery(ProcessContext context, ProcessSession session) {
+ private void processQuery(ProcessContext context, ProcessSession session,
FlowFile originalFlowFile) {
AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
String sObject = context.getProperty(SOBJECT_NAME).getValue();
String fields = context.getProperty(FIELD_NAMES).getValue();
- String customWhereClause =
context.getProperty(CUSTOM_WHERE_CONDITION).getValue();
+ String customWhereClause =
context.getProperty(CUSTOM_WHERE_CONDITION).evaluateAttributeExpressions(originalFlowFile).getValue();
RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
boolean createZeroRecordFlowFiles =
context.getProperty(CREATE_ZERO_RECORD_FILES).asBoolean();
- String ageField = context.getProperty(AGE_FIELD).getValue();
- String initialAgeFilter =
context.getProperty(INITIAL_AGE_FILTER).getValue();
- Long ageDelayMs =
context.getProperty(AGE_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
-
- String ageFilterLower;
- StateMap state;
- try {
- state = context.getStateManager().getState(Scope.CLUSTER);
- ageFilterLower = state.get(LAST_AGE_FILTER);
- } catch (IOException e) {
- throw new ProcessException("Last Age Filter state retrieval
failed", e);
- }
-
- String ageFilterUpper;
- if (ageField == null) {
- ageFilterUpper = null;
- } else {
- OffsetDateTime ageFilterUpperTime;
- if (ageDelayMs == null) {
- ageFilterUpperTime = OffsetDateTime.now();
- } else {
- ageFilterUpperTime = OffsetDateTime.now().minus(ageDelayMs,
ChronoUnit.MILLIS);
- }
- ageFilterUpper =
ageFilterUpperTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
- }
-
+ StateMap state = getState(session);
+ IncrementalContext incrementalContext = new
IncrementalContext(context, state);
SalesforceSchemaHolder salesForceSchemaHolder =
getConvertedSalesforceSchema(sObject, fields);
if (StringUtils.isBlank(fields)) {
@@ -385,144 +377,181 @@ public class QuerySalesforceObject extends
AbstractProcessor {
.collect(Collectors.joining(","));
}
- String querySObject = buildQuery(
- sObject,
- fields,
- customWhereClause,
- ageField,
- initialAgeFilter,
- ageFilterLower,
- ageFilterUpper
- );
+ String querySObject = new SalesforceQueryBuilder(incrementalContext)
+ .buildQuery(sObject, fields, customWhereClause);
+
+ AtomicBoolean isOriginalTransferred = new AtomicBoolean(false);
+ List<FlowFile> outgoingFlowFiles = new ArrayList<>();
+ Map<String, String> originalAttributes =
Optional.ofNullable(originalFlowFile)
+ .map(FlowFile::getAttributes)
+ .orElseGet(HashMap::new);
+
+ long startNanos = System.nanoTime();
do {
- FlowFile flowFile = session.create();
- Map<String, String> originalAttributes = flowFile.getAttributes();
- Map<String, String> attributes = new HashMap<>();
+ FlowFile outgoingFlowFile = createOutgoingFlowFile(session,
originalFlowFile);
+ outgoingFlowFiles.add(outgoingFlowFile);
+ Map<String, String> attributes = new HashMap<>(originalAttributes);
AtomicInteger recordCountHolder = new AtomicInteger();
- long startNanos = System.nanoTime();
- flowFile = session.write(flowFile, out -> {
- try (
- InputStream querySObjectResultInputStream =
getResultInputStream(nextRecordsUrl.get(), querySObject);
-
- JsonTreeRowRecordReader jsonReader = new
JsonTreeRowRecordReader(
- querySObjectResultInputStream,
- getLogger(),
- salesForceSchemaHolder.recordSchema,
- DATE_FORMAT,
- TIME_FORMAT,
- DATE_TIME_FORMAT,
- StartingFieldStrategy.NESTED_FIELD,
- STARTING_FIELD_NAME,
- SchemaApplicationStrategy.SELECTED_PART,
- CAPTURE_PREDICATE
- );
-
- RecordSetWriter writer = writerFactory.createWriter(
- getLogger(),
- writerFactory.getSchema(
- originalAttributes,
- salesForceSchemaHolder.recordSchema
- ),
- out,
- originalAttributes
- )
- ) {
- writer.beginRecordSet();
-
- Record querySObjectRecord;
- while ((querySObjectRecord = jsonReader.nextRecord()) !=
null) {
- writer.write(querySObjectRecord);
- }
-
- WriteResult writeResult = writer.finishRecordSet();
+ try {
+ outgoingFlowFile = session.write(outgoingFlowFile,
processRecordsCallback(session, nextRecordsUrl, writerFactory, state,
incrementalContext,
+ salesForceSchemaHolder, querySObject,
originalAttributes, attributes, recordCountHolder));
+ int recordCount = recordCountHolder.get();
- Map<String, String> capturedFields =
jsonReader.getCapturedFields();
+ if (createZeroRecordFlowFiles || recordCount != 0) {
+ outgoingFlowFile =
session.putAllAttributes(outgoingFlowFile, attributes);
-
nextRecordsUrl.set(capturedFields.getOrDefault(NEXT_RECORDS_URL, null));
+ session.adjustCounter("Records Processed", recordCount,
false);
+ getLogger().info("Successfully written {} records for {}",
recordCount, outgoingFlowFile);
+ } else {
+ outgoingFlowFiles.remove(outgoingFlowFile);
+ session.remove(outgoingFlowFile);
+ }
+ } catch (Exception e) {
+ if (e.getCause() instanceof IOException) {
+ throw new ProcessException("Couldn't get Salesforce
records", e);
+ } else if (e.getCause() instanceof SchemaNotFoundException) {
+ handleError(session, originalFlowFile,
isOriginalTransferred, outgoingFlowFiles, e, "Couldn't create record writer");
+ } else if (e.getCause() instanceof MalformedRecordException) {
+ handleError(session, originalFlowFile,
isOriginalTransferred, outgoingFlowFiles, e, "Couldn't read records from
input");
+ } else {
+ handleError(session, originalFlowFile,
isOriginalTransferred, outgoingFlowFiles, e, "Couldn't get Salesforce records");
+ }
+ break;
+ }
+ } while (nextRecordsUrl.get() != null);
- attributes.put("record.count",
String.valueOf(writeResult.getRecordCount()));
- attributes.put(CoreAttributes.MIME_TYPE.key(),
writer.getMimeType());
- attributes.putAll(writeResult.getAttributes());
+ transferFlowFiles(session, outgoingFlowFiles, originalFlowFile,
isOriginalTransferred, startNanos, sObject);
+ }
- recordCountHolder.set(writeResult.getRecordCount());
+ private OutputStreamCallback processRecordsCallback(ProcessSession
session, AtomicReference<String> nextRecordsUrl, RecordSetWriterFactory
writerFactory,
+ StateMap state,
IncrementalContext incrementalContext, SalesforceSchemaHolder
salesForceSchemaHolder,
+ String querySObject,
Map<String, String> originalAttributes, Map<String, String> attributes,
+ AtomicInteger
recordCountHolder) {
+ return out -> {
+ try {
+ handleRecordSet(out, nextRecordsUrl, querySObject,
writerFactory, salesForceSchemaHolder, originalAttributes, attributes,
recordCountHolder);
- if (ageFilterUpper != null) {
- Map<String, String> newState = new
HashMap<>(state.toMap());
- newState.put(LAST_AGE_FILTER, ageFilterUpper);
- updateState(context, newState);
- }
- } catch (SchemaNotFoundException e) {
- throw new ProcessException("Couldn't create record
writer", e);
- } catch (MalformedRecordException e) {
- throw new ProcessException("Couldn't read records from
input", e);
+ if (incrementalContext.getAgeFilterUpper() != null) {
+ Map<String, String> newState = new
HashMap<>(state.toMap());
+ newState.put(LAST_AGE_FILTER,
incrementalContext.getAgeFilterUpper());
+ updateState(session, newState);
}
- });
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+ }
- int recordCount = recordCountHolder.get();
+ private void handleRecordSet(OutputStream out, AtomicReference<String>
nextRecordsUrl, String querySObject, RecordSetWriterFactory writerFactory,
+ SalesforceSchemaHolder
salesForceSchemaHolder, Map<String, String> originalAttributes, Map<String,
String> attributes,
+ AtomicInteger recordCountHolder) throws
Exception {
+ try (
+ InputStream querySObjectResultInputStream =
getResultInputStream(nextRecordsUrl.get(), querySObject);
+ JsonTreeRowRecordReader jsonReader =
createJsonReader(querySObjectResultInputStream,
salesForceSchemaHolder.getRecordSchema());
+ RecordSetWriter writer = createRecordSetWriter(writerFactory,
originalAttributes, out, salesForceSchemaHolder.getRecordSchema())
+ ) {
+ writer.beginRecordSet();
+
+ Record querySObjectRecord;
+ while ((querySObjectRecord = jsonReader.nextRecord()) != null) {
+ writer.write(querySObjectRecord);
+ }
- if (!createZeroRecordFlowFiles && recordCount == 0) {
- session.remove(flowFile);
- } else {
- flowFile = session.putAllAttributes(flowFile, attributes);
- session.transfer(flowFile, REL_SUCCESS);
+ WriteResult writeResult = writer.finishRecordSet();
- long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- session.getProvenanceReporter().receive(flowFile,
salesforceRestService.getVersionedBaseUrl() + "/composite/tree/" + sObject,
- transferMillis);
+ Map<String, String> capturedFields =
jsonReader.getCapturedFields();
+ nextRecordsUrl.set(capturedFields.getOrDefault(NEXT_RECORDS_URL,
null));
- session.adjustCounter("Records Processed", recordCount, false);
- getLogger().info("Successfully written {} records for {}",
recordCount, flowFile);
- }
- } while (nextRecordsUrl.get() != null);
+ attributes.put("record.count",
String.valueOf(writeResult.getRecordCount()));
+ attributes.put(CoreAttributes.MIME_TYPE.key(),
writer.getMimeType());
+ attributes.putAll(writeResult.getAttributes());
+ recordCountHolder.set(writeResult.getRecordCount());
+ }
+ }
+
+ private JsonTreeRowRecordReader createJsonReader(InputStream
querySObjectResultInputStream, RecordSchema recordSchema) throws IOException,
MalformedRecordException {
+ return new JsonTreeRowRecordReader(
+ querySObjectResultInputStream,
+ getLogger(),
+ recordSchema,
+ DATE_FORMAT,
+ TIME_FORMAT,
+ DATE_TIME_FORMAT,
+ StartingFieldStrategy.NESTED_FIELD,
+ STARTING_FIELD_NAME,
+ SchemaApplicationStrategy.SELECTED_PART,
+ CAPTURE_PREDICATE
+ );
+ }
+
+ private RecordSetWriter createRecordSetWriter(RecordSetWriterFactory
writerFactory, Map<String, String> originalAttributes, OutputStream out,
+ RecordSchema recordSchema)
throws IOException, SchemaNotFoundException {
+ return writerFactory.createWriter(
+ getLogger(),
+ writerFactory.getSchema(
+ originalAttributes,
+ recordSchema
+ ),
+ out,
+ originalAttributes
+ );
}
private void processCustomQuery(ProcessContext context, ProcessSession
session, FlowFile originalFlowFile) {
String customQuery =
context.getProperty(CUSTOM_SOQL_QUERY).evaluateAttributeExpressions(originalFlowFile).getValue();
AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
AtomicReference<String> totalSize = new AtomicReference<>();
- boolean isOriginalTransferred = false;
+ AtomicBoolean isOriginalTransferred = new AtomicBoolean(false);
List<FlowFile> outgoingFlowFiles = new ArrayList<>();
+ long startNanos = System.nanoTime();
do {
- FlowFile outgoingFlowFile;
try (InputStream response =
getResultInputStream(nextRecordsUrl.get(), customQuery)) {
- if (originalFlowFile != null) {
- outgoingFlowFile = session.create(originalFlowFile);
- } else {
- outgoingFlowFile = session.create();
- }
+ FlowFile outgoingFlowFile = createOutgoingFlowFile(session,
originalFlowFile);
outgoingFlowFiles.add(outgoingFlowFile);
- outgoingFlowFile = session.write(outgoingFlowFile,
parseHttpResponse(response, nextRecordsUrl, totalSize));
- int recordCount = nextRecordsUrl.get() != null ? 2000 :
Integer.parseInt(totalSize.get()) % 2000;
+ outgoingFlowFile = session.write(outgoingFlowFile,
parseCustomQueryResponse(response, nextRecordsUrl, totalSize));
+ int recordCount = nextRecordsUrl.get() != null ?
MAX_RECORD_COUNT : Integer.parseInt(totalSize.get()) % MAX_RECORD_COUNT;
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(),
"application/json");
- attributes.put(TOTAL_RECORD_COUNT,
String.valueOf(recordCount));
+ attributes.put(TOTAL_RECORD_COUNT_ATTRIBUTE,
String.valueOf(recordCount));
session.adjustCounter("Salesforce records processed",
recordCount, false);
- session.putAllAttributes(outgoingFlowFile, attributes);
+ outgoingFlowFile = session.putAllAttributes(outgoingFlowFile,
attributes);
} catch (IOException e) {
throw new ProcessException("Couldn't get Salesforce records",
e);
} catch (Exception e) {
- if (originalFlowFile != null) {
- session.transfer(originalFlowFile, REL_FAILURE);
- isOriginalTransferred = true;
- }
- getLogger().error("Couldn't get Salesforce records", e);
- session.remove(outgoingFlowFiles);
- outgoingFlowFiles.clear();
+ handleError(session, originalFlowFile, isOriginalTransferred,
outgoingFlowFiles, e, "Couldn't get Salesforce records");
break;
}
} while (nextRecordsUrl.get() != null);
+ transferFlowFiles(session, outgoingFlowFiles, originalFlowFile,
isOriginalTransferred, startNanos, "custom");
+ }
+
+ private void transferFlowFiles(ProcessSession session, List<FlowFile>
outgoingFlowFiles, FlowFile originalFlowFile, AtomicBoolean
isOriginalTransferred,
+ long startNanos, String urlDetail) {
if (!outgoingFlowFiles.isEmpty()) {
session.transfer(outgoingFlowFiles, REL_SUCCESS);
+ long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+
+ outgoingFlowFiles.forEach(ff ->
+ session.getProvenanceReporter().receive(ff,
salesforceRestService.getVersionedBaseUrl() + "/" + urlDetail, transferMillis)
+ );
}
- if (originalFlowFile != null && !isOriginalTransferred) {
+ if (originalFlowFile != null && !isOriginalTransferred.get()) {
session.transfer(originalFlowFile, REL_ORIGINAL);
}
}
- private OutputStreamCallback parseHttpResponse(InputStream in,
AtomicReference<String> nextRecordsUrl, AtomicReference<String> totalSize) {
+ private FlowFile createOutgoingFlowFile(ProcessSession session, FlowFile
originalFlowFile) {
+ if (originalFlowFile != null) {
+ return session.create(originalFlowFile);
+ } else {
+ return session.create();
+ }
+ }
+
+ private OutputStreamCallback parseCustomQueryResponse(InputStream in,
AtomicReference<String> nextRecordsUrl, AtomicReference<String> totalSize) {
nextRecordsUrl.set(null);
return out -> {
try (JsonParser jsonParser = JSON_FACTORY.createParser(in);
@@ -560,14 +589,44 @@ public class QuerySalesforceObject extends
AbstractProcessor {
}
}
- private void updateState(ProcessContext context, Map<String, String>
newState) {
+ private void handleError(ProcessSession session, FlowFile
originalFlowFile, AtomicBoolean isOriginalTransferred, List<FlowFile>
outgoingFlowFiles,
+ Exception e, String errorMessage) {
+ if (originalFlowFile != null) {
+ session.transfer(originalFlowFile, REL_FAILURE);
+ isOriginalTransferred.set(true);
+ }
+ getLogger().error(errorMessage, e);
+ session.remove(outgoingFlowFiles);
+ outgoingFlowFiles.clear();
+ }
+
+ private StateMap getState(ProcessSession session) {
+ StateMap state;
try {
- context.getStateManager().setState(newState, Scope.CLUSTER);
+ state = session.getState(Scope.CLUSTER);
+ } catch (IOException e) {
+ throw new ProcessException("State retrieval failed", e);
+ }
+ return state;
+ }
+
+ private void updateState(ProcessSession session, Map<String, String>
newState) {
+ try {
+ session.setState(newState, Scope.CLUSTER);
} catch (IOException e) {
throw new ProcessException("Last Age Filter state update failed",
e);
}
}
+ private void clearState(ProcessContext context) {
+ try {
+ getLogger().debug("Clearing state based on property
modifications");
+ context.getStateManager().clear(Scope.CLUSTER);
+ } catch (final IOException e) {
+ getLogger().warn("Failed to clear state", e);
+ }
+ }
+
protected SalesforceSchemaHolder convertSchema(InputStream
describeSObjectResult, String fieldsOfInterest) {
try {
SObjectDescription salesforceObject =
salesForceToRecordSchemaConverter.getSalesforceObject(describeSObjectResult);
@@ -575,9 +634,7 @@ public class QuerySalesforceObject extends
AbstractProcessor {
RecordSchema querySObjectResultSchema = new
SimpleRecordSchema(Collections.singletonList(
new RecordField(STARTING_FIELD_NAME,
RecordFieldType.ARRAY.getArrayDataType(
- RecordFieldType.RECORD.getRecordDataType(
- recordSchema
- )
+
RecordFieldType.RECORD.getRecordDataType(recordSchema)
))
));
@@ -586,57 +643,4 @@ public class QuerySalesforceObject extends
AbstractProcessor {
throw new ProcessException("SObject to Record schema conversion
failed", e);
}
}
-
- protected String buildQuery(
- String sObject,
- String fields,
- String customWhereClause,
- String ageField,
- String initialAgeFilter,
- String ageFilterLower,
- String ageFilterUpper
- ) {
- StringBuilder queryBuilder = new StringBuilder("SELECT ")
- .append(fields)
- .append(" FROM ")
- .append(sObject);
-
- List<String> whereItems = new ArrayList<>();
- if (customWhereClause != null) {
- whereItems.add("( " + customWhereClause + " )");
- }
-
- if (ageField != null) {
- if (ageFilterLower != null) {
- whereItems.add(ageField + " >= " + ageFilterLower);
- } else if (initialAgeFilter != null) {
- whereItems.add(ageField + " >= " + initialAgeFilter);
- }
-
- whereItems.add(ageField + " < " + ageFilterUpper);
- }
-
- if (!whereItems.isEmpty()) {
- String finalWhereClause = String.join(" AND ", whereItems);
- queryBuilder.append(" WHERE ").append(finalWhereClause);
- }
-
- return queryBuilder.toString();
- }
-
- static class SalesforceSchemaHolder {
- RecordSchema querySObjectResultSchema;
- RecordSchema recordSchema;
- SObjectDescription salesforceObject;
-
- public SalesforceSchemaHolder(RecordSchema querySObjectResultSchema,
RecordSchema recordSchema, SObjectDescription salesforceObject) {
- this.querySObjectResultSchema = querySObjectResultSchema;
- this.recordSchema = recordSchema;
- this.salesforceObject = salesforceObject;
- }
-
- public SObjectDescription getSalesforceObject() {
- return salesforceObject;
- }
- }
}
diff --git
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/rest/SalesforceConfiguration.java
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/rest/SalesforceConfiguration.java
new file mode 100644
index 0000000000..d974efe93a
--- /dev/null
+++
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/rest/SalesforceConfiguration.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.rest;
+
+import java.util.function.Supplier;
+
+public final class SalesforceConfiguration {
+
+ private final String instanceUrl;
+ private final String version;
+ private final Supplier<String> accessTokenProvider;
+ private final int responseTimeoutMillis;
+
+ private SalesforceConfiguration(String instanceUrl, String version,
Supplier<String> accessTokenProvider, int responseTimeoutMillis) {
+ this.instanceUrl = instanceUrl;
+ this.version = version;
+ this.accessTokenProvider = accessTokenProvider;
+ this.responseTimeoutMillis = responseTimeoutMillis;
+ }
+
+ public static SalesforceConfiguration create(String instanceUrl, String
version, Supplier<String> accessTokenProvider, int responseTimeoutMillis) {
+ return new SalesforceConfiguration(instanceUrl, version,
accessTokenProvider, responseTimeoutMillis);
+ }
+
+ public String getInstanceUrl() {
+ return instanceUrl;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public Supplier<String> getAccessTokenProvider() {
+ return accessTokenProvider;
+ }
+
+ public int getResponseTimeoutMillis() {
+ return responseTimeoutMillis;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/rest/SalesforceRestClient.java
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/rest/SalesforceRestClient.java
new file mode 100644
index 0000000000..26cb6c420a
--- /dev/null
+++
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/rest/SalesforceRestClient.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.rest;
+
+import okhttp3.HttpUrl;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.concurrent.TimeUnit;
+
+public class SalesforceRestClient {
+
+ private static final MediaType JSON_MEDIA_TYPE =
MediaType.parse("application/json; charset=utf-8");
+
+ private final SalesforceConfiguration configuration;
+ private final OkHttpClient httpClient;
+
+ public SalesforceRestClient(SalesforceConfiguration configuration) {
+ this.configuration = configuration;
+ httpClient = new OkHttpClient.Builder()
+ .readTimeout(configuration.getResponseTimeoutMillis(),
TimeUnit.MILLISECONDS)
+ .build();
+ }
+
+ public InputStream describeSObject(String sObject) {
+ String url = getUrl("/sobjects/" + sObject + "/describe?maxRecords=1");
+ Request request = buildGetRequest(url);
+ return executeRequest(request);
+ }
+
+ public InputStream query(String query) {
+ HttpUrl httpUrl = HttpUrl.get(getUrl("/query")).newBuilder()
+ .addQueryParameter("q", query)
+ .build();
+ Request request = buildGetRequest(httpUrl.toString());
+ return executeRequest(request);
+ }
+
+ public InputStream getNextRecords(String nextRecordsUrl) {
+ HttpUrl httpUrl = HttpUrl.get(configuration.getInstanceUrl() +
nextRecordsUrl).newBuilder().build();
+ Request request = buildGetRequest(httpUrl.toString());
+ return executeRequest(request);
+ }
+
+ public void postRecord(String sObjectApiName, String body) {
+ HttpUrl httpUrl = HttpUrl.get(getUrl("/composite/tree/" +
sObjectApiName)).newBuilder().build();
+ RequestBody requestBody = RequestBody.create(body, JSON_MEDIA_TYPE);
+ Request request = buildPostRequest(httpUrl.toString(), requestBody);
+ executeRequest(request);
+ }
+
+ private InputStream executeRequest(Request request) {
+ Response response = null;
+ try {
+ response = httpClient.newCall(request).execute();
+ if (!response.isSuccessful()) {
+ throw new ProcessException(String.format("Invalid response
[%s]: %s", response.code(), response.body() == null ? null :
response.body().string()));
+ }
+ return response.body().byteStream();
+ } catch (IOException e) {
+ if (response != null) {
+ response.close();
+ }
+ throw new UncheckedIOException(String.format("Salesforce HTTP
request failed [%s]", request.url()), e);
+ }
+ }
+
+ private String getUrl(String path) {
+ return getVersionedBaseUrl() + path;
+ }
+
+ public String getVersionedBaseUrl() {
+ return configuration.getInstanceUrl() + "/services/data/v" +
configuration.getVersion();
+ }
+
+ private Request buildGetRequest(String url) {
+ return new Request.Builder()
+ .addHeader("Authorization", "Bearer " +
configuration.getAccessTokenProvider().get())
+ .url(url)
+ .get()
+ .build();
+ }
+
+ private Request buildPostRequest(String url, RequestBody requestBody) {
+ return new Request.Builder()
+ .addHeader("Authorization", "Bearer " +
configuration.getAccessTokenProvider().get())
+ .url(url)
+ .post(requestBody)
+ .build();
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/schema/SalesforceSchemaHolder.java
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/schema/SalesforceSchemaHolder.java
new file mode 100644
index 0000000000..5d466a62dd
--- /dev/null
+++
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/schema/SalesforceSchemaHolder.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.schema;
+
+import org.apache.camel.component.salesforce.api.dto.SObjectDescription;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class SalesforceSchemaHolder {
+
+ RecordSchema querySObjectResultSchema;
+ RecordSchema recordSchema;
+ SObjectDescription salesforceObject;
+
+ public SalesforceSchemaHolder(RecordSchema querySObjectResultSchema,
RecordSchema recordSchema, SObjectDescription salesforceObject) {
+ this.querySObjectResultSchema = querySObjectResultSchema;
+ this.recordSchema = recordSchema;
+ this.salesforceObject = salesforceObject;
+ }
+
+ public SObjectDescription getSalesforceObject() {
+ return salesforceObject;
+ }
+
+ public RecordSchema getQuerySObjectResultSchema() {
+ return querySObjectResultSchema;
+ }
+
+ public RecordSchema getRecordSchema() {
+ return recordSchema;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/schema/SalesforceToRecordSchemaConverter.java
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/schema/SalesforceToRecordSchemaConverter.java
new file mode 100644
index 0000000000..3e0d82d147
--- /dev/null
+++
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/schema/SalesforceToRecordSchemaConverter.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.schema;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.component.salesforce.api.dto.SObjectDescription;
+import org.apache.camel.component.salesforce.api.dto.SObjectField;
+import org.apache.camel.component.salesforce.api.utils.JsonUtils;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class SalesforceToRecordSchemaConverter {
+ private static final ObjectMapper OBJECT_MAPPER =
JsonUtils.createObjectMapper();
+ private final String dateFormat;
+ private final String dateTimeFormat;
+ private final String timeFormat;
+
+ public SalesforceToRecordSchemaConverter(String dateFormat, String
dateTimeFormat, String timeFormat) {
+ this.dateFormat = dateFormat;
+ this.dateTimeFormat = dateTimeFormat;
+ this.timeFormat = timeFormat;
+ }
+
+ public SObjectDescription getSalesforceObject(InputStream
salesforceObjectResultJsonString) throws IOException {
+ return OBJECT_MAPPER.readValue(salesforceObjectResultJsonString,
SObjectDescription.class);
+ }
+
+ public RecordSchema convertSchema(SObjectDescription salesforceObject,
String fieldNamesOfInterest) {
+ List<SObjectField> fields = salesforceObject.getFields();
+ if (StringUtils.isNotBlank(fieldNamesOfInterest)) {
+ fields = filterFieldsOfInterest(fields, fieldNamesOfInterest);
+ }
+ List<RecordField> recordFields = null;
+ try {
+ recordFields = convertSObjectFieldsToRecordFields(fields);
+ } catch (IllegalArgumentException e) {
+ throw new ProcessException(String.format("Could not determine
schema for '%s'", salesforceObject.getName()), e);
+ }
+
+ return new SimpleRecordSchema(recordFields);
+ }
+
+ private List<RecordField>
convertSObjectFieldsToRecordFields(List<SObjectField> fields) {
+ List<RecordField> recordFields = new ArrayList<>();
+ for (SObjectField field : fields) {
+ recordFields.add(convertSObjectFieldToRecordField(field));
+ }
+ return recordFields;
+ }
+
+ private List<SObjectField> filterFieldsOfInterest(List<SObjectField>
fields, String fieldNamesOfInterest) {
+ List<String> listOfFieldNamesOfInterest =
Arrays.asList(fieldNamesOfInterest.toLowerCase().split("\\s*,\\s*"));
+ return fields
+ .stream()
+ .filter(sObjectField ->
listOfFieldNamesOfInterest.contains(sObjectField.getName().toLowerCase()))
+ .collect(Collectors.toList());
+ }
+
+ private RecordField convertSObjectFieldToRecordField(SObjectField field) {
+ String soapType = field.getSoapType();
+ DataType dataType;
+ switch (soapType.substring(soapType.indexOf(':') + 1)) {
+ case "ID":
+ case "string":
+ case "json":
+ case "base64Binary":
+ case "anyType":
+ dataType = RecordFieldType.STRING.getDataType();
+ break;
+ case "int":
+ dataType = RecordFieldType.INT.getDataType();
+ break;
+ case "long":
+ dataType = RecordFieldType.LONG.getDataType();
+ break;
+ case "double":
+ dataType = RecordFieldType.DOUBLE.getDataType();
+ break;
+ case "boolean":
+ dataType = RecordFieldType.BOOLEAN.getDataType();
+ break;
+ case "date":
+ dataType = RecordFieldType.DATE.getDataType(dateFormat);
+ break;
+ case "dateTime":
+ dataType =
RecordFieldType.TIMESTAMP.getDataType(dateTimeFormat);
+ break;
+ case "time":
+ dataType = RecordFieldType.TIME.getDataType(timeFormat);
+ break;
+ case "address":
+ dataType =
RecordFieldType.RECORD.getRecordDataType(createAddressSchema());
+ break;
+ case "location":
+ dataType =
RecordFieldType.RECORD.getRecordDataType(createLocationSchema());
+ break;
+ default:
+ throw new IllegalArgumentException(String.format("Could not
convert field '%s' of soap type '%s'.", field.getName(), soapType));
+ }
+ return new RecordField(field.getName(), dataType,
field.getDefaultValue(), field.isNillable());
+ }
+
+ private RecordSchema createAddressSchema() {
+ return new SimpleRecordSchema(Arrays.asList(
+ new RecordField("city", RecordFieldType.STRING.getDataType(),
true),
+ new RecordField("country",
RecordFieldType.STRING.getDataType(), true),
+ new RecordField("countryCode",
RecordFieldType.STRING.getDataType(), true),
+ new RecordField("postalCode",
RecordFieldType.STRING.getDataType(), true),
+ new RecordField("state", RecordFieldType.STRING.getDataType(),
true),
+ new RecordField("stateCode",
RecordFieldType.STRING.getDataType(), true),
+ new RecordField("street",
RecordFieldType.STRING.getDataType(), true),
+ new RecordField("geocodeAccuracy",
RecordFieldType.STRING.getDataType(), true)
+ ));
+ }
+
+ private RecordSchema createLocationSchema() {
+ return new SimpleRecordSchema(Arrays.asList(
+ new RecordField("latitude",
RecordFieldType.STRING.getDataType(), true),
+ new RecordField("longitude",
RecordFieldType.STRING.getDataType(), true)
+ ));
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/CommonSalesforceProperties.java
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/CommonSalesforceProperties.java
index 39188396d9..c8f6078cd8 100644
---
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/CommonSalesforceProperties.java
+++
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/CommonSalesforceProperties.java
@@ -23,10 +23,10 @@ import org.apache.nifi.processor.util.StandardValidators;
public final class CommonSalesforceProperties {
- public static final PropertyDescriptor API_URL = new
PropertyDescriptor.Builder()
+ public static final PropertyDescriptor SALESFORCE_INSTANCE_URL = new
PropertyDescriptor.Builder()
.name("salesforce-url")
- .displayName("URL")
- .description("The URL for the Salesforce REST API including the
domain without additional path information, such as
https://MyDomainName.my.salesforce.com")
+ .displayName("Salesforce Instance URL")
+ .description("The URL of the Salesforce instance including the
domain without additional path information, such as
https://MyDomainName.my.salesforce.com")
.required(true)
.addValidator(StandardValidators.URL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
diff --git
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/IncrementalContext.java
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/IncrementalContext.java
new file mode 100644
index 0000000000..4b155a042b
--- /dev/null
+++
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/IncrementalContext.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.salesforce.QuerySalesforceObject;
+
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+public class IncrementalContext {
+
+ private final String ageField;
+ private final String initialAgeFilter;
+ private final String ageFilterUpper;
+ private final String ageFilterLower;
+
+ public IncrementalContext(ProcessContext context, StateMap state) {
+ ageField =
context.getProperty(QuerySalesforceObject.AGE_FIELD).getValue();
+ initialAgeFilter =
context.getProperty(QuerySalesforceObject.INITIAL_AGE_FILTER).getValue();
+ ageFilterLower = state.get(QuerySalesforceObject.LAST_AGE_FILTER);
+ Optional<Long> ageDelayMs =
Optional.ofNullable(context.getProperty(QuerySalesforceObject.AGE_DELAY).asTimePeriod(TimeUnit.MILLISECONDS));
+
+ if (ageField == null) {
+ ageFilterUpper = null;
+ } else {
+ OffsetDateTime ageFilterUpperTime = ageDelayMs
+ .map(delay -> OffsetDateTime.now().minus(delay,
ChronoUnit.MILLIS))
+ .orElse(OffsetDateTime.now());
+ ageFilterUpper =
ageFilterUpperTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
+ }
+ }
+
+ public String getAgeField() {
+ return ageField;
+ }
+
+ public String getInitialAgeFilter() {
+ return initialAgeFilter;
+ }
+
+ public String getAgeFilterUpper() {
+ return ageFilterUpper;
+ }
+
+ public String getAgeFilterLower() {
+ return ageFilterLower;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceQueryBuilder.java
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceQueryBuilder.java
new file mode 100644
index 0000000000..115c1c3aff
--- /dev/null
+++
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceQueryBuilder.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SalesforceQueryBuilder {
+
+ private final IncrementalContext incrementalContext;
+
+ public SalesforceQueryBuilder(IncrementalContext incrementalContext) {
+ this.incrementalContext = incrementalContext;
+ }
+
+ public String buildQuery(String sObject, String fields, String
customWhereClause) {
+ StringBuilder queryBuilder = new StringBuilder("SELECT ")
+ .append(fields)
+ .append(" FROM ")
+ .append(sObject);
+
+ List<String> whereItems = new ArrayList<>();
+ if (customWhereClause != null) {
+ whereItems.add("( " + customWhereClause + " )");
+ }
+
+ addAgeFilter(whereItems);
+
+ if (!whereItems.isEmpty()) {
+ String finalWhereClause = String.join(" AND ", whereItems);
+ queryBuilder.append(" WHERE ").append(finalWhereClause);
+ }
+
+ return queryBuilder.toString();
+ }
+
+ private void addAgeFilter(List<String> whereItems) {
+ String ageField = incrementalContext.getAgeField();
+ String ageFilterLower = incrementalContext.getAgeFilterLower();
+ String initialAgeFilter = incrementalContext.getInitialAgeFilter();
+
+ if (ageField != null) {
+ if (ageFilterLower != null) {
+ whereItems.add(ageField + " >= " + ageFilterLower);
+ } else if (initialAgeFilter != null) {
+ whereItems.add(ageField + " >= " + initialAgeFilter);
+ }
+
+ whereItems.add(ageField + " < " +
incrementalContext.getAgeFilterUpper());
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java
deleted file mode 100644
index 0affbc441e..0000000000
---
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.salesforce.util;
-
-import okhttp3.HttpUrl;
-import okhttp3.MediaType;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import okhttp3.RequestBody;
-import okhttp3.Response;
-import org.apache.nifi.processor.exception.ProcessException;
-
-import java.io.InputStream;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-
-public class SalesforceRestService {
- private final String version;
- private final String baseUrl;
- private final Supplier<String> accessTokenProvider;
- private final OkHttpClient httpClient;
-
- public SalesforceRestService(String version, String baseUrl,
Supplier<String> accessTokenProvider, int responseTimeoutMillis) {
- this.version = version;
- this.baseUrl = baseUrl;
- this.accessTokenProvider = accessTokenProvider;
- httpClient = new OkHttpClient.Builder()
- .readTimeout(responseTimeoutMillis, TimeUnit.MILLISECONDS)
- .build();
- }
-
- public InputStream describeSObject(String sObject) {
- String url = getVersionedBaseUrl() + "/sobjects/" + sObject +
"/describe?maxRecords=1";
-
- Request request = new Request.Builder()
- .addHeader("Authorization", "Bearer " +
accessTokenProvider.get())
- .url(url)
- .get()
- .build();
-
- return request(request);
- }
-
- public InputStream query(String query) {
- String url = getVersionedBaseUrl() + "/query";
-
- HttpUrl httpUrl = HttpUrl.get(url).newBuilder()
- .addQueryParameter("q", query)
- .build();
-
- Request request = new Request.Builder()
- .addHeader("Authorization", "Bearer " +
accessTokenProvider.get())
- .url(httpUrl)
- .get()
- .build();
-
- return request(request);
- }
-
- public InputStream getNextRecords(String nextRecordsUrl) {
- String url = baseUrl + nextRecordsUrl;
-
- HttpUrl httpUrl = HttpUrl.get(url).newBuilder()
- .build();
-
- Request request = new Request.Builder()
- .addHeader("Authorization", "Bearer " +
accessTokenProvider.get())
- .url(httpUrl)
- .get()
- .build();
-
- return request(request);
- }
-
- public InputStream postRecord(String sObjectApiName, String body) {
- String url = getVersionedBaseUrl() + "/composite/tree/" +
sObjectApiName;
-
- HttpUrl httpUrl = HttpUrl.get(url).newBuilder()
- .build();
-
- final RequestBody requestBody = RequestBody.create(body,
MediaType.parse("application/json"));
-
- Request request = new Request.Builder()
- .addHeader("Authorization", "Bearer " +
accessTokenProvider.get())
- .url(httpUrl)
- .post(requestBody)
- .build();
-
- return request(request);
- }
-
- public String getVersionedBaseUrl() {
- return baseUrl + "/services/data/v" + version;
- }
-
- private InputStream request(Request request) {
- Response response = null;
- try {
- response = httpClient.newCall(request).execute();
- if (response.code() < 200 || response.code() > 201) {
- throw new ProcessException("Invalid response" +
- " Code: " + response.code() +
- " Message: " + response.message() +
- " Body: " + (response.body() == null ? null :
response.body().string())
- );
- }
- return response.body().byteStream();
- } catch (ProcessException e) {
- if (response != null) {
- response.close();
- }
- throw e;
- } catch (Exception e) {
- if (response != null) {
- response.close();
- }
- throw new ProcessException(String.format("Salesforce HTTP request
failed [%s]", request.url()), e);
- }
- }
-}
diff --git
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverter.java
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverter.java
deleted file mode 100644
index 5bc6637161..0000000000
---
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverter.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.salesforce.util;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.camel.component.salesforce.api.dto.SObjectDescription;
-import org.apache.camel.component.salesforce.api.dto.SObjectField;
-import org.apache.camel.component.salesforce.api.utils.JsonUtils;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.util.StringUtils;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class SalesforceToRecordSchemaConverter {
- private static final ObjectMapper OBJECT_MAPPER =
JsonUtils.createObjectMapper();
- private final String dateFormat;
- private final String dateTimeFormat;
- private final String timeFormat;
-
- public SalesforceToRecordSchemaConverter(String dateFormat, String
dateTimeFormat, String timeFormat) {
- this.dateFormat = dateFormat;
- this.dateTimeFormat = dateTimeFormat;
- this.timeFormat = timeFormat;
- }
-
- public SObjectDescription getSalesforceObject(InputStream
salesforceObjectResultJsonString) throws IOException {
- return OBJECT_MAPPER.readValue(salesforceObjectResultJsonString,
SObjectDescription.class);
- }
-
- public RecordSchema convertSchema(SObjectDescription salesforceObject,
String fieldNamesOfInterest) {
- List<SObjectField> fields = salesforceObject.getFields();
- if (StringUtils.isNotBlank(fieldNamesOfInterest)) {
- final List<String> listOfFieldNamesOfInterest =
Arrays.asList(fieldNamesOfInterest.toLowerCase().split("\\s*,\\s*"));
- fields = fields
- .stream()
- .filter(sObjectField ->
listOfFieldNamesOfInterest.contains(sObjectField.getName().toLowerCase()))
- .collect(Collectors.toList());
- }
-
- final List<RecordField> recordFields = new ArrayList<>();
-
- for (SObjectField field : fields) {
- final String soapType = field.getSoapType();
-
- switch (soapType.substring(soapType.indexOf(':') + 1)) {
- case "ID":
- case "string":
- case "json":
- case "base64Binary":
- case "anyType":
- recordFields.add(new RecordField(field.getName(),
RecordFieldType.STRING.getDataType(), field.getDefaultValue(),
field.isNillable()));
- break;
- case "int":
- recordFields.add(new RecordField(field.getName(),
RecordFieldType.INT.getDataType(), field.getDefaultValue(),
field.isNillable()));
- break;
- case "long":
- recordFields.add(new RecordField(field.getName(),
RecordFieldType.LONG.getDataType(), field.getDefaultValue(),
field.isNillable()));
- break;
- case "double":
- recordFields.add(new RecordField(field.getName(),
RecordFieldType.DOUBLE.getDataType(), field.getDefaultValue(),
field.isNillable()));
- break;
- case "boolean":
- recordFields.add(new RecordField(field.getName(),
RecordFieldType.BOOLEAN.getDataType(), field.getDefaultValue(),
field.isNillable()));
- break;
- case "date":
- recordFields.add(new RecordField(field.getName(),
RecordFieldType.DATE.getDataType(dateFormat), field.getDefaultValue(),
field.isNillable()));
- break;
- case "dateTime":
- recordFields.add(new RecordField(field.getName(),
RecordFieldType.TIMESTAMP.getDataType(dateTimeFormat), field.getDefaultValue(),
field.isNillable()));
- break;
- case "time":
- recordFields.add(new RecordField(field.getName(),
RecordFieldType.TIME.getDataType(timeFormat), field.getDefaultValue(),
field.isNillable()));
- break;
- case "address":
- final RecordSchema addressSchema = new
SimpleRecordSchema(Arrays.asList(
- new RecordField("city",
RecordFieldType.STRING.getDataType(), true),
- new RecordField("country",
RecordFieldType.STRING.getDataType(), true),
- new RecordField("countryCode",
RecordFieldType.STRING.getDataType(), true),
- new RecordField("postalCode",
RecordFieldType.STRING.getDataType(), true),
- new RecordField("state",
RecordFieldType.STRING.getDataType(), true),
- new RecordField("stateCode",
RecordFieldType.STRING.getDataType(), true),
- new RecordField("street",
RecordFieldType.STRING.getDataType(), true),
- new RecordField("geocodeAccuracy",
RecordFieldType.STRING.getDataType(), true)
- ));
- recordFields.add(new RecordField(field.getName(),
RecordFieldType.RECORD.getRecordDataType(addressSchema),
field.getDefaultValue(), field.isNillable()));
- break;
- case "location":
- final RecordSchema locationSchema = new
SimpleRecordSchema(Arrays.asList(
- new RecordField("latitude",
RecordFieldType.STRING.getDataType(), true),
- new RecordField("longitude",
RecordFieldType.STRING.getDataType(), true)
- ));
- recordFields.add(new RecordField(field.getName(),
RecordFieldType.RECORD.getRecordDataType(locationSchema),
field.getDefaultValue(), field.isNillable()));
- break;
- default:
- throw new IllegalArgumentException(String.format("Could
not determine schema for '%s'. Could not convert field '%s' of soap type '%s'.",
- salesforceObject.getName(), field.getName(),
soapType));
- }
- }
-
- return new SimpleRecordSchema(recordFields);
- }
-}
diff --git
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/validator/SalesforceAgeValidator.java
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/validator/SalesforceAgeValidator.java
new file mode 100644
index 0000000000..900304c768
--- /dev/null
+++
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/validator/SalesforceAgeValidator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.validator;
+
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+
+import java.util.List;
+
+import static
org.apache.nifi.processors.salesforce.QuerySalesforceObject.AGE_FIELD;
+import static
org.apache.nifi.processors.salesforce.QuerySalesforceObject.INITIAL_AGE_FILTER;
+
+public final class SalesforceAgeValidator {
+
+ private SalesforceAgeValidator() {
+ }
+
+ public static List<ValidationResult> validate(ValidationContext
validationContext, List<ValidationResult> results) {
+ if (validationContext.getProperty(INITIAL_AGE_FILTER).isSet() &&
!validationContext.getProperty(AGE_FIELD).isSet()) {
+ results.add(
+ new ValidationResult.Builder()
+ .subject(INITIAL_AGE_FILTER.getDisplayName())
+ .valid(false)
+ .explanation("it requires " +
AGE_FIELD.getDisplayName() + " also to be set.")
+ .build()
+ );
+ }
+ return results;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceObjectIT.java
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceObjectIT.java
index 2c296a119f..96b37d5a35 100644
---
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceObjectIT.java
+++
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceObjectIT.java
@@ -99,8 +99,8 @@ class PutSalesforceObjectIT implements SalesforceConfigAware {
assertTrue(runner.getProvenanceEvents().isEmpty());
List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(PutSalesforceObject.REL_FAILURE);
- MockFlowFile ff0 = flowFiles.get(0);
- ff0.assertAttributeExists("error.message");
+ MockFlowFile ff = flowFiles.get(0);
+ ff.assertAttributeExists("error.message");
}
@Test
@@ -120,8 +120,8 @@ class PutSalesforceObjectIT implements
SalesforceConfigAware {
assertTrue(runner.getProvenanceEvents().isEmpty());
List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(PutSalesforceObject.REL_FAILURE);
- MockFlowFile ff0 = flowFiles.get(0);
- ff0.assertAttributeExists("error.message");
+ MockFlowFile ff = flowFiles.get(0);
+ ff.assertAttributeExists("error.message");
}
private void configureProcessor(final MockRecordParser reader) throws
InitializationException {
@@ -129,7 +129,7 @@ class PutSalesforceObjectIT implements
SalesforceConfigAware {
runner.enableControllerService(reader);
runner.setProperty(CommonSalesforceProperties.API_VERSION, VERSION);
- runner.setProperty(CommonSalesforceProperties.API_URL, BASE_URL);
+ runner.setProperty(CommonSalesforceProperties.SALESFORCE_INSTANCE_URL,
INSTANCE_URL);
runner.setProperty(PutSalesforceObject.RECORD_READER_FACTORY,
reader.getIdentifier());
}
}
diff --git
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java
index 6c91c728ff..4da9c137e4 100644
---
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java
+++
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java
@@ -68,7 +68,7 @@ class QuerySalesforceObjectIT implements
SalesforceConfigAware {
runner.setProperty(QuerySalesforceObject.SOBJECT_NAME, sObjectName);
runner.setProperty(QuerySalesforceObject.FIELD_NAMES, fieldNames);
runner.setProperty(CommonSalesforceProperties.API_VERSION, VERSION);
- runner.setProperty(CommonSalesforceProperties.API_URL, BASE_URL);
+ runner.setProperty(CommonSalesforceProperties.SALESFORCE_INSTANCE_URL,
INSTANCE_URL);
runner.setProperty(QuerySalesforceObject.RECORD_WRITER,
writer.getIdentifier());
runner.setProperty(QuerySalesforceObject.AGE_FIELD, "CreatedDate");
runner.setProperty(QuerySalesforceObject.INITIAL_AGE_FILTER,
"2022-01-06T08:43:24.000+0000");
@@ -83,12 +83,12 @@ class QuerySalesforceObjectIT implements
SalesforceConfigAware {
@Test
void runCustomQuery() {
- String customQuery = "SELECT Id, Name, AccountId,
Account.ShippingAddress FROM Contact";
+ String customQuery = "SELECT Id, Name FROM Account";
runner.setProperty(QuerySalesforceObject.QUERY_TYPE,
QuerySalesforceObject.CUSTOM_QUERY);
runner.setProperty(QuerySalesforceObject.CUSTOM_SOQL_QUERY,
customQuery);
runner.setProperty(CommonSalesforceProperties.API_VERSION, VERSION);
- runner.setProperty(CommonSalesforceProperties.API_URL, BASE_URL);
+ runner.setProperty(CommonSalesforceProperties.SALESFORCE_INSTANCE_URL,
INSTANCE_URL);
runner.run();
diff --git
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceConfigAware.java
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceConfigAware.java
index c5b73e5897..98adffa39f 100644
---
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceConfigAware.java
+++
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceConfigAware.java
@@ -31,9 +31,9 @@ import org.apache.nifi.util.TestRunner;
*/
public interface SalesforceConfigAware {
String VERSION = "54.0";
- String BASE_URL = "https://MyDomainName.my.salesforce.com";
+ String INSTANCE_URL = "https://MyDomainName.my.salesforce.com";
- String AUTHORIZATION_SERVER_URL = BASE_URL + "/services/oauth2/token";
+ String AUTHORIZATION_SERVER_URL = INSTANCE_URL + "/services/oauth2/token";
String USERNAME = "???";
String PASSWORD = "???";
String CLIENT_ID = "???";
diff --git
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceRestServiceIT.java
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceRestServiceIT.java
index a82b9ac863..abbb79282e 100644
---
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceRestServiceIT.java
+++
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceRestServiceIT.java
@@ -21,6 +21,8 @@ import
org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.salesforce.rest.SalesforceConfiguration;
+import org.apache.nifi.processors.salesforce.rest.SalesforceRestClient;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterEach;
@@ -37,7 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
*/
class SalesforceRestServiceIT implements SalesforceConfigAware {
private TestRunner runner;
- private SalesforceRestService testSubject;
+ private SalesforceRestClient testSubject;
@BeforeEach
void setUp() throws Exception {
@@ -48,13 +50,9 @@ class SalesforceRestServiceIT implements
SalesforceConfigAware {
});
StandardOauth2AccessTokenProvider oauth2AccessTokenProvider =
initOAuth2AccessTokenProvider(runner);
-
- testSubject = new SalesforceRestService(
- VERSION,
- BASE_URL,
- () ->
oauth2AccessTokenProvider.getAccessDetails().getAccessToken(),
- 5_000
- );
+ SalesforceConfiguration configuration =
SalesforceConfiguration.create(INSTANCE_URL, VERSION,
+ () ->
oauth2AccessTokenProvider.getAccessDetails().getAccessToken(), 5_000);
+ testSubject = new SalesforceRestClient(configuration);
}
@AfterEach
diff --git
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java
index 408bd62394..f971beec4d 100644
---
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java
+++
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java
@@ -18,6 +18,8 @@ package org.apache.nifi.processors.salesforce.util;
import com.fasterxml.jackson.databind.exc.MismatchedInputException;
import org.apache.camel.component.salesforce.api.dto.SObjectDescription;
+import org.apache.nifi.processor.exception.ProcessException;
+import
org.apache.nifi.processors.salesforce.schema.SalesforceToRecordSchemaConverter;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
@@ -174,9 +176,7 @@ class SalesforceToRecordSchemaConverterTest {
try (final InputStream sfSchema = readFile(TEST_PATH +
"unknown_type_sf_schema.json")) {
final String fieldNames = "FieldWithUnknownType";
final SObjectDescription salesforceObject =
converter.getSalesforceObject(sfSchema);
- final IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, () ->
converter.convertSchema(salesforceObject, fieldNames));
- final String errorMessage = "Could not determine schema for
'SObjectWithUnknownFieldType'. Could not convert field 'FieldWithUnknownType'
of soap type 'xsd:unknown'.";
- assertEquals(errorMessage, exception.getMessage());
+ assertThrows(ProcessException.class, () ->
converter.convertSchema(salesforceObject, fieldNames));
}
}