This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new d52e097dec NIFI-11224: Refactor and FF attribute support in WHERE in 
QuerySalesforceObject.
d52e097dec is described below

commit d52e097decfb2662a34733ba11bddbf5e440d65e
Author: Pierre Villard <[email protected]>
AuthorDate: Mon Feb 27 13:47:06 2023 +0100

    NIFI-11224: Refactor and FF attribute support in WHERE in 
QuerySalesforceObject.
    
    This closes #7019.
    
    Co-authored-by: Lehel Boér <[email protected]>
    Signed-off-by: Tamas Palfy <[email protected]>
---
 .../processors/salesforce/PutSalesforceObject.java | 102 ++---
 .../salesforce/QuerySalesforceObject.java          | 450 +++++++++++----------
 .../salesforce/rest/SalesforceConfiguration.java   |  54 +++
 .../salesforce/rest/SalesforceRestClient.java      | 112 +++++
 .../salesforce/schema/SalesforceSchemaHolder.java  |  45 +++
 .../schema/SalesforceToRecordSchemaConverter.java  | 148 +++++++
 .../util/CommonSalesforceProperties.java           |   6 +-
 .../salesforce/util/IncrementalContext.java        |  67 +++
 .../salesforce/util/SalesforceQueryBuilder.java    |  66 +++
 .../salesforce/util/SalesforceRestService.java     | 134 ------
 .../util/SalesforceToRecordSchemaConverter.java    | 124 ------
 .../validator/SalesforceAgeValidator.java          |  44 ++
 .../salesforce/PutSalesforceObjectIT.java          |  10 +-
 .../salesforce/QuerySalesforceObjectIT.java        |   6 +-
 .../salesforce/util/SalesforceConfigAware.java     |   4 +-
 .../salesforce/util/SalesforceRestServiceIT.java   |  14 +-
 .../SalesforceToRecordSchemaConverterTest.java     |   6 +-
 17 files changed, 841 insertions(+), 551 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java
 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java
index 83605800f9..210ceafedd 100644
--- 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java
+++ 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java
@@ -35,8 +35,9 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.salesforce.rest.SalesforceConfiguration;
+import org.apache.nifi.processors.salesforce.rest.SalesforceRestClient;
 import org.apache.nifi.processors.salesforce.util.RecordExtender;
-import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
 import org.apache.nifi.schema.access.NopSchemaAccessWriter;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.MalformedRecordException;
@@ -56,9 +57,9 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import static 
org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_URL;
 import static 
org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_VERSION;
 import static 
org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.READ_TIMEOUT;
+import static 
org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.SALESFORCE_INSTANCE_URL;
 import static 
org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.TOKEN_PROVIDER;
 
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@@ -94,7 +95,7 @@ public class PutSalesforceObject extends AbstractProcessor {
             .build();
 
     private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
-            API_URL,
+            SALESFORCE_INSTANCE_URL,
             API_VERSION,
             READ_TIMEOUT,
             TOKEN_PROVIDER,
@@ -106,7 +107,7 @@ public class PutSalesforceObject extends AbstractProcessor {
             REL_FAILURE
     )));
 
-    private volatile SalesforceRestService salesforceRestService;
+    private volatile SalesforceRestClient salesforceRestClient;
     private volatile int maxRecordCount;
 
     @Override
@@ -122,19 +123,8 @@ public class PutSalesforceObject extends AbstractProcessor 
{
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
         maxRecordCount = getMaxRecordCount();
-
-        String salesforceVersion = context.getProperty(API_VERSION).getValue();
-        String baseUrl = context.getProperty(API_URL).getValue();
-        OAuth2AccessTokenProvider accessTokenProvider =
-                
context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
-
-        salesforceRestService = new SalesforceRestService(
-                salesforceVersion,
-                baseUrl,
-                () -> accessTokenProvider.getAccessDetails().getAccessToken(),
-                
context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS)
-                        .intValue()
-        );
+        SalesforceConfiguration configuration = 
createSalesforceConfiguration(context);
+        salesforceRestClient = new SalesforceRestClient(configuration);
     }
 
     @Override
@@ -146,69 +136,85 @@ public class PutSalesforceObject extends 
AbstractProcessor {
 
         String objectType = flowFile.getAttribute(ATTR_OBJECT_TYPE);
         if (objectType == null) {
-            getLogger().error("Salesforce object type not found among the 
incoming FlowFile attributes");
-            flowFile = session.putAttribute(flowFile, ATTR_ERROR_MESSAGE, 
"Salesforce object type not found among FlowFile attributes");
-            session.transfer(session.penalize(flowFile), REL_FAILURE);
+            handleInvalidFlowFile(session, flowFile);
             return;
         }
 
+        try {
+            long startNanos = System.nanoTime();
+            processRecords(flowFile, objectType, context, session);
+            session.transfer(flowFile, REL_SUCCESS);
+            long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+            session.getProvenanceReporter().send(flowFile, 
salesforceRestClient.getVersionedBaseUrl() + "/put/" + objectType, 
transferMillis);
+        } catch (MalformedRecordException e) {
+            getLogger().error("Couldn't read records from input", e);
+            transferToFailure(session, flowFile, e.getMessage());
+        } catch (SchemaNotFoundException e) {
+            getLogger().error("Couldn't create record writer", e);
+            transferToFailure(session, flowFile, e.getMessage());
+        } catch (Exception e) {
+            getLogger().error("Failed to put records to Salesforce.", e);
+            transferToFailure(session, flowFile, e.getMessage());
+        }
+    }
+
+    private void processRecords(FlowFile flowFile, String objectType, 
ProcessContext context, ProcessSession session) throws IOException, 
MalformedRecordException, SchemaNotFoundException {
         RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
+        int count = 0;
+        RecordExtender recordExtender;
 
-        RecordExtender extender;
-        long startNanos = System.nanoTime();
-        try {
         try (InputStream in = session.read(flowFile);
              RecordReader reader = readerFactory.createRecordReader(flowFile, 
in, getLogger());
              ByteArrayOutputStream out = new ByteArrayOutputStream();
-             WriteJsonResult writer = getWriter(extender = new 
RecordExtender(reader.getSchema()), out)) {
+             WriteJsonResult writer = getWriter(recordExtender = 
getExtender(reader), out)) {
 
-            int count = 0;
             Record record;
-
             while ((record = reader.nextRecord()) != null) {
                 count++;
                 if (!writer.isActiveRecordSet()) {
                     writer.beginRecordSet();
                 }
 
-                MapRecord extendedRecord = 
extender.getExtendedRecord(objectType, count, record);
+                MapRecord extendedRecord = 
recordExtender.getExtendedRecord(objectType, count, record);
                 writer.write(extendedRecord);
 
                 if (count == maxRecordCount) {
                     count = 0;
-                    processRecords(objectType, out, writer, extender);
+                    postRecordBatch(objectType, out, writer, recordExtender);
                     out.reset();
                 }
             }
             if (writer.isActiveRecordSet()) {
-                processRecords(objectType, out, writer, extender);
+                postRecordBatch(objectType, out, writer, recordExtender);
             }
-          }
-          session.transfer(flowFile, REL_SUCCESS);
-          long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-          session.getProvenanceReporter().send(flowFile, 
salesforceRestService.getVersionedBaseUrl()+ "/composite/tree/" + objectType, 
transferMillis);
-        } catch (MalformedRecordException e) {
-            getLogger().error("Couldn't read records from input", e);
-            transferToFailure(session, flowFile, e);
-        } catch (SchemaNotFoundException e) {
-            getLogger().error("Couldn't create record writer", e);
-            transferToFailure(session, flowFile, e);
-        } catch (Exception e) {
-            getLogger().error("Failed to put records to Salesforce.", e);
-            transferToFailure(session, flowFile, e);
         }
     }
 
-    private void transferToFailure(ProcessSession session, FlowFile flowFile, 
Exception e) {
-        flowFile = session.putAttribute(flowFile, ATTR_ERROR_MESSAGE, 
e.getMessage());
+    private SalesforceConfiguration 
createSalesforceConfiguration(ProcessContext context) {
+        String salesforceVersion = context.getProperty(API_VERSION).getValue();
+        String instanceUrl = 
context.getProperty(SALESFORCE_INSTANCE_URL).getValue();
+        OAuth2AccessTokenProvider accessTokenProvider =
+                
context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+        return SalesforceConfiguration.create(instanceUrl, salesforceVersion,
+                () -> accessTokenProvider.getAccessDetails().getAccessToken(), 
0);
+    }
+
+    private void handleInvalidFlowFile(ProcessSession session, FlowFile 
flowFile) {
+        String errorMessage = "Salesforce object type not found among the 
incoming FlowFile attributes";
+        getLogger().error(errorMessage);
+        transferToFailure(session, flowFile, errorMessage);
+    }
+
+    private void transferToFailure(ProcessSession session, FlowFile flowFile, 
String message) {
+        flowFile = session.putAttribute(flowFile, ATTR_ERROR_MESSAGE, message);
         session.transfer(session.penalize(flowFile), REL_FAILURE);
     }
 
-    private void processRecords(String objectType, ByteArrayOutputStream out, 
WriteJsonResult writer, RecordExtender extender) throws IOException {
+    private void postRecordBatch(String objectType, ByteArrayOutputStream out, 
WriteJsonResult writer, RecordExtender extender) throws IOException {
         writer.finishRecordSet();
         writer.flush();
         ObjectNode wrappedJson = extender.getWrappedRecordsJson(out);
-        salesforceRestService.postRecord(objectType, 
wrappedJson.toPrettyString());
+        salesforceRestClient.postRecord(objectType, 
wrappedJson.toPrettyString());
     }
 
     private WriteJsonResult getWriter(RecordExtender extender, 
ByteArrayOutputStream out) throws IOException {
@@ -217,6 +223,10 @@ public class PutSalesforceObject extends AbstractProcessor 
{
                 true, NullSuppression.NEVER_SUPPRESS, 
OutputGrouping.OUTPUT_ARRAY, null, null, null);
     }
 
+    private RecordExtender getExtender(RecordReader reader) throws 
MalformedRecordException {
+        return new RecordExtender(reader.getSchema());
+    }
+
     int getMaxRecordCount() {
         return MAX_RECORD_COUNT;
     }
diff --git 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
index 77a82f0b03..5387340511 100644
--- 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
+++ 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
@@ -16,14 +16,14 @@
  */
 package org.apache.nifi.processors.salesforce;
 
-import org.apache.camel.component.salesforce.api.dto.SObjectDescription;
-import org.apache.camel.component.salesforce.api.dto.SObjectField;
 import com.fasterxml.jackson.core.JsonEncoding;
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.component.salesforce.api.dto.SObjectDescription;
+import org.apache.camel.component.salesforce.api.dto.SObjectField;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.Stateful;
@@ -55,8 +55,13 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
-import 
org.apache.nifi.processors.salesforce.util.SalesforceToRecordSchemaConverter;
+import org.apache.nifi.processors.salesforce.rest.SalesforceConfiguration;
+import org.apache.nifi.processors.salesforce.rest.SalesforceRestClient;
+import org.apache.nifi.processors.salesforce.schema.SalesforceSchemaHolder;
+import 
org.apache.nifi.processors.salesforce.schema.SalesforceToRecordSchemaConverter;
+import org.apache.nifi.processors.salesforce.util.IncrementalContext;
+import org.apache.nifi.processors.salesforce.util.SalesforceQueryBuilder;
+import org.apache.nifi.processors.salesforce.validator.SalesforceAgeValidator;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.MalformedRecordException;
@@ -72,10 +77,8 @@ import org.apache.nifi.util.StringUtils;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.UncheckedIOException;
-import java.time.OffsetDateTime;
-import java.time.format.DateTimeFormatter;
-import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -84,16 +87,18 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiPredicate;
 import java.util.stream.Collectors;
 
-import static 
org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_URL;
 import static 
org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_VERSION;
 import static 
org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.READ_TIMEOUT;
+import static 
org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.SALESFORCE_INSTANCE_URL;
 import static 
org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.TOKEN_PROVIDER;
 
 @TriggerSerially
@@ -102,18 +107,19 @@ import static 
org.apache.nifi.processors.salesforce.util.CommonSalesforcePropert
 @CapabilityDescription("Retrieves records from a Salesforce sObject. Users can 
add arbitrary filter conditions by setting the 'Custom WHERE Condition' 
property."
         + " The processor can also run a custom query, although record 
processing is not supported in that case."
         + " Supports incremental retrieval: users can define a field in the 
'Age Field' property that will be used to determine when the record was 
created."
-        + " When this property is set the processor will retrieve new records. 
It's also possible to define an initial cutoff value for the age, filtering out 
all older records"
+        + " When this property is set the processor will retrieve new records. 
Incremental loading and record-based processing are only supported in 
property-based queries."
+        + " It's also possible to define an initial cutoff value for the age, 
filtering out all older records"
         + " even for the first run. In case of 'Property Based Query' this 
processor should run on the Primary Node only."
         + " FlowFile attribute 'record.count' indicates how many records were 
retrieved and written to the output."
-        + " By using 'Custom Query', the processor can accept an optional 
input flowfile and reference the flowfile attributes in the query."
-        + " However, incremental loading and record-based processing are not 
supported in this scenario.")
+        + " The processor can accept an optional input FlowFile and reference 
the FlowFile attributes in the query.")
 @Stateful(scopes = Scope.CLUSTER, description = "When 'Age Field' is set, 
after performing a query the time of execution is stored. Subsequent queries 
will be augmented"
         + " with an additional condition so that only records that are newer 
than the stored execution time (adjusted with the optional value of 'Age 
Delay') will be retrieved."
         + " State is stored across the cluster so that this Processor can be 
run on Primary Node only and if a new Primary Node is selected,"
         + " the new node can pick up where the previous node left off, without 
duplicating the data.")
 @WritesAttributes({
         @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer."),
-        @WritesAttribute(attribute = "record.count", description = "Sets the 
number of records in the FlowFile.")
+        @WritesAttribute(attribute = "record.count", description = "Sets the 
number of records in the FlowFile."),
+        @WritesAttribute(attribute = "total.record.count", description = "Sets 
the total number of records in the FlowFile.")
 })
 @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
 @SeeAlso(PutSalesforceObject.class)
@@ -181,7 +187,7 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
             .dependsOn(QUERY_TYPE, PROPERTY_BASED_QUERY)
             .build();
 
-    static final PropertyDescriptor AGE_FIELD = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor AGE_FIELD = new 
PropertyDescriptor.Builder()
             .name("age-field")
             .displayName("Age Field")
             .description("The name of a TIMESTAMP field that will be used to 
filter records using a bounded time window."
@@ -193,7 +199,7 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
             .dependsOn(QUERY_TYPE, PROPERTY_BASED_QUERY)
             .build();
 
-    static final PropertyDescriptor AGE_DELAY = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor AGE_DELAY = new 
PropertyDescriptor.Builder()
             .name("age-delay")
             .displayName("Age Delay")
             .description("The ending timestamp of the time window will be 
adjusted earlier by the amount configured in this property." +
@@ -205,7 +211,7 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
             .dependsOn(QUERY_TYPE, PROPERTY_BASED_QUERY)
             .build();
 
-    static final PropertyDescriptor INITIAL_AGE_FILTER = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor INITIAL_AGE_FILTER = new 
PropertyDescriptor.Builder()
             .name("initial-age-filter")
             .displayName("Initial Age Start Time")
             .description("This property specifies the start time that the 
processor applies when running the first query.")
@@ -243,7 +249,7 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
             .autoTerminateDefault(true)
             .build();
 
-    private static final String LAST_AGE_FILTER = "last_age_filter";
+    public static final String LAST_AGE_FILTER = "last_age_filter";
     private static final String STARTING_FIELD_NAME = "records";
     private static final String DATE_FORMAT = "yyyy-MM-dd";
     private static final String TIME_FORMAT = "HH:mm:ss.SSSX";
@@ -254,13 +260,20 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
     private static final BiPredicate<String, String> CAPTURE_PREDICATE = 
(fieldName, fieldValue) -> NEXT_RECORDS_URL.equals(fieldName);
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
-    private static final String TOTAL_RECORD_COUNT = "total.record.count";
+    private static final String TOTAL_RECORD_COUNT_ATTRIBUTE = 
"total.record.count";
+    private static final int MAX_RECORD_COUNT = 2000;
 
     private volatile SalesforceToRecordSchemaConverter 
salesForceToRecordSchemaConverter;
-    private volatile SalesforceRestService salesforceRestService;
+    private volatile SalesforceRestClient salesforceRestService;
+    private volatile boolean resetState = false;
 
     @OnScheduled
     public void onScheduled(ProcessContext context) {
+        if (resetState) {
+            clearState(context);
+            resetState = false;
+        }
+
         salesForceToRecordSchemaConverter = new 
SalesforceToRecordSchemaConverter(
                 DATE_FORMAT,
                 DATE_TIME_FORMAT,
@@ -268,19 +281,21 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
         );
 
         String salesforceVersion = context.getProperty(API_VERSION).getValue();
-        String baseUrl = context.getProperty(API_URL).getValue();
+        String instanceUrl = 
context.getProperty(SALESFORCE_INSTANCE_URL).getValue();
         OAuth2AccessTokenProvider accessTokenProvider = 
context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
 
-        salesforceRestService = new SalesforceRestService(
+        SalesforceConfiguration salesforceConfiguration = 
SalesforceConfiguration.create(
+                instanceUrl,
                 salesforceVersion,
-                baseUrl,
                 () -> accessTokenProvider.getAccessDetails().getAccessToken(),
                 
context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue()
         );
+
+        salesforceRestService = new 
SalesforceRestClient(salesforceConfiguration);
     }
 
     private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
-            API_URL,
+            SALESFORCE_INSTANCE_URL,
             API_VERSION,
             QUERY_TYPE,
             CUSTOM_SOQL_QUERY,
@@ -313,69 +328,46 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
         List<ValidationResult> results = new 
ArrayList<>(super.customValidate(validationContext));
-        if (validationContext.getProperty(INITIAL_AGE_FILTER).isSet() && 
!validationContext.getProperty(AGE_FIELD).isSet()) {
-            results.add(
-                    new ValidationResult.Builder()
-                            .subject(INITIAL_AGE_FILTER.getDisplayName())
-                            .valid(false)
-                            .explanation("it requires " + 
AGE_FIELD.getDisplayName() + " also to be set.")
-                            .build()
-            );
+        return SalesforceAgeValidator.validate(validationContext, results);
+    }
+
+    @Override
+    public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
+        if ((oldValue != null && !oldValue.equals(newValue))
+                && (descriptor.equals(SALESFORCE_INSTANCE_URL)
+                || descriptor.equals(QUERY_TYPE)
+                || descriptor.equals(SOBJECT_NAME)
+                || descriptor.equals(AGE_FIELD)
+                || descriptor.equals(INITIAL_AGE_FILTER)
+                || descriptor.equals(CUSTOM_WHERE_CONDITION))
+        ) {
+            getLogger().debug("A property that require resetting state was 
modified - {} oldValue {} newValue {}",
+                    descriptor.getDisplayName(), oldValue, newValue);
+            resetState = true;
         }
-        return results;
     }
 
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
         boolean isCustomQuery = 
CUSTOM_QUERY.getValue().equals(context.getProperty(QUERY_TYPE).getValue());
-
-
+        FlowFile flowFile = session.get();
         if (isCustomQuery) {
-            FlowFile flowFile = session.get();
-            if (flowFile == null && context.hasIncomingConnection()) {
-                context.yield();
-                return;
-            }
             processCustomQuery(context, session, flowFile);
-            return;
+        } else {
+            processQuery(context, session, flowFile);
         }
-        processQuery(context, session);
     }
 
-    private void processQuery(ProcessContext context, ProcessSession session) {
+    private void processQuery(ProcessContext context, ProcessSession session, 
FlowFile originalFlowFile) {
         AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
         String sObject = context.getProperty(SOBJECT_NAME).getValue();
         String fields = context.getProperty(FIELD_NAMES).getValue();
-        String customWhereClause = 
context.getProperty(CUSTOM_WHERE_CONDITION).getValue();
+        String customWhereClause = 
context.getProperty(CUSTOM_WHERE_CONDITION).evaluateAttributeExpressions(originalFlowFile).getValue();
         RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
         boolean createZeroRecordFlowFiles = 
context.getProperty(CREATE_ZERO_RECORD_FILES).asBoolean();
 
-        String ageField = context.getProperty(AGE_FIELD).getValue();
-        String initialAgeFilter = 
context.getProperty(INITIAL_AGE_FILTER).getValue();
-        Long ageDelayMs = 
context.getProperty(AGE_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
-
-        String ageFilterLower;
-        StateMap state;
-        try {
-            state = context.getStateManager().getState(Scope.CLUSTER);
-            ageFilterLower = state.get(LAST_AGE_FILTER);
-        } catch (IOException e) {
-            throw new ProcessException("Last Age Filter state retrieval 
failed", e);
-        }
-
-        String ageFilterUpper;
-        if (ageField == null) {
-            ageFilterUpper = null;
-        } else {
-            OffsetDateTime ageFilterUpperTime;
-            if (ageDelayMs == null) {
-                ageFilterUpperTime = OffsetDateTime.now();
-            } else {
-                ageFilterUpperTime = OffsetDateTime.now().minus(ageDelayMs, 
ChronoUnit.MILLIS);
-            }
-            ageFilterUpper = 
ageFilterUpperTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
-        }
-
+        StateMap state = getState(session);
+        IncrementalContext incrementalContext = new 
IncrementalContext(context, state);
         SalesforceSchemaHolder salesForceSchemaHolder = 
getConvertedSalesforceSchema(sObject, fields);
 
         if (StringUtils.isBlank(fields)) {
@@ -385,144 +377,181 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
                     .collect(Collectors.joining(","));
         }
 
-        String querySObject = buildQuery(
-                sObject,
-                fields,
-                customWhereClause,
-                ageField,
-                initialAgeFilter,
-                ageFilterLower,
-                ageFilterUpper
-        );
+        String querySObject = new SalesforceQueryBuilder(incrementalContext)
+                .buildQuery(sObject, fields, customWhereClause);
+
+        AtomicBoolean isOriginalTransferred = new AtomicBoolean(false);
+        List<FlowFile> outgoingFlowFiles = new ArrayList<>();
+        Map<String, String> originalAttributes = 
Optional.ofNullable(originalFlowFile)
+                .map(FlowFile::getAttributes)
+                .orElseGet(HashMap::new);
+
+        long startNanos = System.nanoTime();
 
         do {
-            FlowFile flowFile = session.create();
-            Map<String, String> originalAttributes = flowFile.getAttributes();
-            Map<String, String> attributes = new HashMap<>();
+            FlowFile outgoingFlowFile = createOutgoingFlowFile(session, 
originalFlowFile);
+            outgoingFlowFiles.add(outgoingFlowFile);
+            Map<String, String> attributes = new HashMap<>(originalAttributes);
 
             AtomicInteger recordCountHolder = new AtomicInteger();
-            long startNanos = System.nanoTime();
-            flowFile = session.write(flowFile, out -> {
-                try (
-                        InputStream querySObjectResultInputStream = 
getResultInputStream(nextRecordsUrl.get(), querySObject);
-
-                        JsonTreeRowRecordReader jsonReader = new 
JsonTreeRowRecordReader(
-                                querySObjectResultInputStream,
-                                getLogger(),
-                                salesForceSchemaHolder.recordSchema,
-                                DATE_FORMAT,
-                                TIME_FORMAT,
-                                DATE_TIME_FORMAT,
-                                StartingFieldStrategy.NESTED_FIELD,
-                                STARTING_FIELD_NAME,
-                                SchemaApplicationStrategy.SELECTED_PART,
-                                CAPTURE_PREDICATE
-                        );
-
-                        RecordSetWriter writer = writerFactory.createWriter(
-                                getLogger(),
-                                writerFactory.getSchema(
-                                        originalAttributes,
-                                        salesForceSchemaHolder.recordSchema
-                                ),
-                                out,
-                                originalAttributes
-                        )
-                ) {
-                    writer.beginRecordSet();
-
-                    Record querySObjectRecord;
-                    while ((querySObjectRecord = jsonReader.nextRecord()) != 
null) {
-                        writer.write(querySObjectRecord);
-                    }
-
-                    WriteResult writeResult = writer.finishRecordSet();
+            try {
+                outgoingFlowFile = session.write(outgoingFlowFile, 
processRecordsCallback(session, nextRecordsUrl, writerFactory, state, 
incrementalContext,
+                        salesForceSchemaHolder, querySObject, 
originalAttributes, attributes, recordCountHolder));
+                int recordCount = recordCountHolder.get();
 
-                    Map<String, String> capturedFields = 
jsonReader.getCapturedFields();
+                if (createZeroRecordFlowFiles || recordCount != 0) {
+                    outgoingFlowFile = 
session.putAllAttributes(outgoingFlowFile, attributes);
 
-                    
nextRecordsUrl.set(capturedFields.getOrDefault(NEXT_RECORDS_URL, null));
+                    session.adjustCounter("Records Processed", recordCount, 
false);
+                    getLogger().info("Successfully written {} records for {}", 
recordCount, outgoingFlowFile);
+                } else {
+                    outgoingFlowFiles.remove(outgoingFlowFile);
+                    session.remove(outgoingFlowFile);
+                }
+            } catch (Exception e) {
+                if (e.getCause() instanceof IOException) {
+                    throw new ProcessException("Couldn't get Salesforce 
records", e);
+                } else if (e.getCause() instanceof SchemaNotFoundException) {
+                    handleError(session, originalFlowFile, 
isOriginalTransferred, outgoingFlowFiles, e, "Couldn't create record writer");
+                } else if (e.getCause() instanceof MalformedRecordException) {
+                    handleError(session, originalFlowFile, 
isOriginalTransferred, outgoingFlowFiles, e, "Couldn't read records from 
input");
+                } else {
+                    handleError(session, originalFlowFile, 
isOriginalTransferred, outgoingFlowFiles, e, "Couldn't get Salesforce records");
+                }
+                break;
+            }
+        } while (nextRecordsUrl.get() != null);
 
-                    attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
-                    attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
-                    attributes.putAll(writeResult.getAttributes());
+        transferFlowFiles(session, outgoingFlowFiles, originalFlowFile, 
isOriginalTransferred, startNanos, sObject);
+    }
 
-                    recordCountHolder.set(writeResult.getRecordCount());
+    private OutputStreamCallback processRecordsCallback(ProcessSession 
session, AtomicReference<String> nextRecordsUrl, RecordSetWriterFactory 
writerFactory,
+                                                        StateMap state, 
IncrementalContext incrementalContext, SalesforceSchemaHolder 
salesForceSchemaHolder,
+                                                        String querySObject, 
Map<String, String> originalAttributes, Map<String, String> attributes,
+                                                        AtomicInteger 
recordCountHolder) {
+        return out -> {
+            try {
+                handleRecordSet(out, nextRecordsUrl, querySObject, 
writerFactory, salesForceSchemaHolder, originalAttributes, attributes, 
recordCountHolder);
 
-                    if (ageFilterUpper != null) {
-                        Map<String, String> newState = new 
HashMap<>(state.toMap());
-                        newState.put(LAST_AGE_FILTER, ageFilterUpper);
-                        updateState(context, newState);
-                    }
-                } catch (SchemaNotFoundException e) {
-                    throw new ProcessException("Couldn't create record 
writer", e);
-                } catch (MalformedRecordException e) {
-                    throw new ProcessException("Couldn't read records from 
input", e);
+                if (incrementalContext.getAgeFilterUpper() != null) {
+                    Map<String, String> newState = new 
HashMap<>(state.toMap());
+                    newState.put(LAST_AGE_FILTER, 
incrementalContext.getAgeFilterUpper());
+                    updateState(session, newState);
                 }
-            });
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        };
+    }
 
-            int recordCount = recordCountHolder.get();
+    private void handleRecordSet(OutputStream out, AtomicReference<String> 
nextRecordsUrl, String querySObject, RecordSetWriterFactory writerFactory,
+                                 SalesforceSchemaHolder 
salesForceSchemaHolder, Map<String, String> originalAttributes, Map<String, 
String> attributes,
+                                 AtomicInteger recordCountHolder) throws 
Exception {
+        try (
+                InputStream querySObjectResultInputStream = 
getResultInputStream(nextRecordsUrl.get(), querySObject);
+                JsonTreeRowRecordReader jsonReader = 
createJsonReader(querySObjectResultInputStream, 
salesForceSchemaHolder.getRecordSchema());
+                RecordSetWriter writer = createRecordSetWriter(writerFactory, 
originalAttributes, out, salesForceSchemaHolder.getRecordSchema())
+        ) {
+            writer.beginRecordSet();
+
+            Record querySObjectRecord;
+            while ((querySObjectRecord = jsonReader.nextRecord()) != null) {
+                writer.write(querySObjectRecord);
+            }
 
-            if (!createZeroRecordFlowFiles && recordCount == 0) {
-                session.remove(flowFile);
-            } else {
-                flowFile = session.putAllAttributes(flowFile, attributes);
-                session.transfer(flowFile, REL_SUCCESS);
+            WriteResult writeResult = writer.finishRecordSet();
 
-                long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-                session.getProvenanceReporter().receive(flowFile, 
salesforceRestService.getVersionedBaseUrl() + "/composite/tree/" + sObject,
-                        transferMillis);
+            Map<String, String> capturedFields = 
jsonReader.getCapturedFields();
+            nextRecordsUrl.set(capturedFields.getOrDefault(NEXT_RECORDS_URL, 
null));
 
-                session.adjustCounter("Records Processed", recordCount, false);
-                getLogger().info("Successfully written {} records for {}", 
recordCount, flowFile);
-            }
-        } while (nextRecordsUrl.get() != null);
+            attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
+            attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
+            attributes.putAll(writeResult.getAttributes());
+            recordCountHolder.set(writeResult.getRecordCount());
+        }
+    }
+
+    private JsonTreeRowRecordReader createJsonReader(InputStream 
querySObjectResultInputStream, RecordSchema recordSchema) throws IOException, 
MalformedRecordException {
+        return new JsonTreeRowRecordReader(
+                querySObjectResultInputStream,
+                getLogger(),
+                recordSchema,
+                DATE_FORMAT,
+                TIME_FORMAT,
+                DATE_TIME_FORMAT,
+                StartingFieldStrategy.NESTED_FIELD,
+                STARTING_FIELD_NAME,
+                SchemaApplicationStrategy.SELECTED_PART,
+                CAPTURE_PREDICATE
+        );
+    }
+
+    private RecordSetWriter createRecordSetWriter(RecordSetWriterFactory 
writerFactory, Map<String, String> originalAttributes, OutputStream out,
+                                                  RecordSchema recordSchema) 
throws IOException, SchemaNotFoundException {
+        return writerFactory.createWriter(
+                getLogger(),
+                writerFactory.getSchema(
+                        originalAttributes,
+                        recordSchema
+                ),
+                out,
+                originalAttributes
+        );
     }
 
     private void processCustomQuery(ProcessContext context, ProcessSession 
session, FlowFile originalFlowFile) {
         String customQuery = 
context.getProperty(CUSTOM_SOQL_QUERY).evaluateAttributeExpressions(originalFlowFile).getValue();
         AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
         AtomicReference<String> totalSize = new AtomicReference<>();
-        boolean isOriginalTransferred = false;
+        AtomicBoolean isOriginalTransferred = new AtomicBoolean(false);
         List<FlowFile> outgoingFlowFiles = new ArrayList<>();
+        long startNanos = System.nanoTime();
         do {
-            FlowFile outgoingFlowFile;
             try (InputStream response = 
getResultInputStream(nextRecordsUrl.get(), customQuery)) {
-                if (originalFlowFile != null) {
-                    outgoingFlowFile = session.create(originalFlowFile);
-                } else {
-                    outgoingFlowFile = session.create();
-                }
+                FlowFile outgoingFlowFile = createOutgoingFlowFile(session, 
originalFlowFile);
                 outgoingFlowFiles.add(outgoingFlowFile);
-                outgoingFlowFile = session.write(outgoingFlowFile, 
parseHttpResponse(response, nextRecordsUrl, totalSize));
-                int recordCount = nextRecordsUrl.get() != null ? 2000 : 
Integer.parseInt(totalSize.get()) % 2000;
+                outgoingFlowFile = session.write(outgoingFlowFile, 
parseCustomQueryResponse(response, nextRecordsUrl, totalSize));
+                int recordCount = nextRecordsUrl.get() != null ? 
MAX_RECORD_COUNT : Integer.parseInt(totalSize.get()) % MAX_RECORD_COUNT;
                 Map<String, String> attributes = new HashMap<>();
                 attributes.put(CoreAttributes.MIME_TYPE.key(), 
"application/json");
-                attributes.put(TOTAL_RECORD_COUNT, 
String.valueOf(recordCount));
+                attributes.put(TOTAL_RECORD_COUNT_ATTRIBUTE, 
String.valueOf(recordCount));
                 session.adjustCounter("Salesforce records processed", 
recordCount, false);
-                session.putAllAttributes(outgoingFlowFile, attributes);
+                outgoingFlowFile = session.putAllAttributes(outgoingFlowFile, 
attributes);
             } catch (IOException e) {
                 throw new ProcessException("Couldn't get Salesforce records", 
e);
             } catch (Exception e) {
-                if (originalFlowFile != null) {
-                    session.transfer(originalFlowFile, REL_FAILURE);
-                    isOriginalTransferred = true;
-                }
-                getLogger().error("Couldn't get Salesforce records", e);
-                session.remove(outgoingFlowFiles);
-                outgoingFlowFiles.clear();
+                handleError(session, originalFlowFile, isOriginalTransferred, 
outgoingFlowFiles, e, "Couldn't get Salesforce records");
                 break;
             }
         } while (nextRecordsUrl.get() != null);
 
+        transferFlowFiles(session, outgoingFlowFiles, originalFlowFile, 
isOriginalTransferred, startNanos, "custom");
+    }
+
+    private void transferFlowFiles(ProcessSession session, List<FlowFile> 
outgoingFlowFiles, FlowFile originalFlowFile, AtomicBoolean 
isOriginalTransferred,
+                                   long startNanos, String urlDetail) {
         if (!outgoingFlowFiles.isEmpty()) {
             session.transfer(outgoingFlowFiles, REL_SUCCESS);
+            long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+
+            outgoingFlowFiles.forEach(ff ->
+                    session.getProvenanceReporter().receive(ff, 
salesforceRestService.getVersionedBaseUrl() + "/" + urlDetail, transferMillis)
+            );
         }
-        if (originalFlowFile != null && !isOriginalTransferred) {
+        if (originalFlowFile != null && !isOriginalTransferred.get()) {
             session.transfer(originalFlowFile, REL_ORIGINAL);
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(InputStream in, 
AtomicReference<String> nextRecordsUrl, AtomicReference<String> totalSize) {
+    private FlowFile createOutgoingFlowFile(ProcessSession session, FlowFile 
originalFlowFile) {
+        if (originalFlowFile != null) {
+            return session.create(originalFlowFile);
+        } else {
+            return session.create();
+        }
+    }
+
+    private OutputStreamCallback parseCustomQueryResponse(InputStream in, 
AtomicReference<String> nextRecordsUrl, AtomicReference<String> totalSize) {
         nextRecordsUrl.set(null);
         return out -> {
             try (JsonParser jsonParser = JSON_FACTORY.createParser(in);
@@ -560,14 +589,44 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
         }
     }
 
-    private void updateState(ProcessContext context, Map<String, String> 
newState) {
+    private void handleError(ProcessSession session, FlowFile 
originalFlowFile, AtomicBoolean isOriginalTransferred, List<FlowFile> 
outgoingFlowFiles,
+                             Exception e, String errorMessage) {
+        if (originalFlowFile != null) {
+            session.transfer(originalFlowFile, REL_FAILURE);
+            isOriginalTransferred.set(true);
+        }
+        getLogger().error(errorMessage, e);
+        session.remove(outgoingFlowFiles);
+        outgoingFlowFiles.clear();
+    }
+
+    private StateMap getState(ProcessSession session) {
+        StateMap state;
         try {
-            context.getStateManager().setState(newState, Scope.CLUSTER);
+            state = session.getState(Scope.CLUSTER);
+        } catch (IOException e) {
+            throw new ProcessException("State retrieval failed", e);
+        }
+        return state;
+    }
+
+    private void updateState(ProcessSession session, Map<String, String> 
newState) {
+        try {
+            session.setState(newState, Scope.CLUSTER);
         } catch (IOException e) {
             throw new ProcessException("Last Age Filter state update failed", 
e);
         }
     }
 
+    private void clearState(ProcessContext context) {
+        try {
+            getLogger().debug("Clearing state based on property 
modifications");
+            context.getStateManager().clear(Scope.CLUSTER);
+        } catch (final IOException e) {
+            getLogger().warn("Failed to clear state", e);
+        }
+    }
+
     protected SalesforceSchemaHolder convertSchema(InputStream 
describeSObjectResult, String fieldsOfInterest) {
         try {
             SObjectDescription salesforceObject = 
salesForceToRecordSchemaConverter.getSalesforceObject(describeSObjectResult);
@@ -575,9 +634,7 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
 
             RecordSchema querySObjectResultSchema = new 
SimpleRecordSchema(Collections.singletonList(
                     new RecordField(STARTING_FIELD_NAME, 
RecordFieldType.ARRAY.getArrayDataType(
-                            RecordFieldType.RECORD.getRecordDataType(
-                                    recordSchema
-                            )
+                            
RecordFieldType.RECORD.getRecordDataType(recordSchema)
                     ))
             ));
 
@@ -586,57 +643,4 @@ public class QuerySalesforceObject extends 
AbstractProcessor {
             throw new ProcessException("SObject to Record schema conversion 
failed", e);
         }
     }
-
-    protected String buildQuery(
-            String sObject,
-            String fields,
-            String customWhereClause,
-            String ageField,
-            String initialAgeFilter,
-            String ageFilterLower,
-            String ageFilterUpper
-    ) {
-        StringBuilder queryBuilder = new StringBuilder("SELECT ")
-                .append(fields)
-                .append(" FROM ")
-                .append(sObject);
-
-        List<String> whereItems = new ArrayList<>();
-        if (customWhereClause != null) {
-            whereItems.add("( " + customWhereClause + " )");
-        }
-
-        if (ageField != null) {
-            if (ageFilterLower != null) {
-                whereItems.add(ageField + " >= " + ageFilterLower);
-            } else if (initialAgeFilter != null) {
-                whereItems.add(ageField + " >= " + initialAgeFilter);
-            }
-
-            whereItems.add(ageField + " < " + ageFilterUpper);
-        }
-
-        if (!whereItems.isEmpty()) {
-            String finalWhereClause = String.join(" AND ", whereItems);
-            queryBuilder.append(" WHERE ").append(finalWhereClause);
-        }
-
-        return queryBuilder.toString();
-    }
-
-    static class SalesforceSchemaHolder {
-        RecordSchema querySObjectResultSchema;
-        RecordSchema recordSchema;
-        SObjectDescription salesforceObject;
-
-        public SalesforceSchemaHolder(RecordSchema querySObjectResultSchema, 
RecordSchema recordSchema, SObjectDescription salesforceObject) {
-            this.querySObjectResultSchema = querySObjectResultSchema;
-            this.recordSchema = recordSchema;
-            this.salesforceObject = salesforceObject;
-        }
-
-        public SObjectDescription getSalesforceObject() {
-            return salesforceObject;
-        }
-    }
 }
diff --git 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/rest/SalesforceConfiguration.java
 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/rest/SalesforceConfiguration.java
new file mode 100644
index 0000000000..d974efe93a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/rest/SalesforceConfiguration.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.rest;
+
+import java.util.function.Supplier;
+
+public final class SalesforceConfiguration {
+
+    private final String instanceUrl;
+    private final String version;
+    private final Supplier<String> accessTokenProvider;
+    private final int responseTimeoutMillis;
+
+    private SalesforceConfiguration(String instanceUrl, String version, 
Supplier<String> accessTokenProvider, int responseTimeoutMillis) {
+        this.instanceUrl = instanceUrl;
+        this.version = version;
+        this.accessTokenProvider = accessTokenProvider;
+        this.responseTimeoutMillis = responseTimeoutMillis;
+    }
+
+    public static SalesforceConfiguration create(String instanceUrl, String 
version, Supplier<String> accessTokenProvider, int responseTimeoutMillis) {
+        return new SalesforceConfiguration(instanceUrl, version, 
accessTokenProvider, responseTimeoutMillis);
+    }
+
+    public String getInstanceUrl() {
+        return instanceUrl;
+    }
+
+    public String getVersion() {
+        return version;
+    }
+
+    public Supplier<String> getAccessTokenProvider() {
+        return accessTokenProvider;
+    }
+
+    public int getResponseTimeoutMillis() {
+        return responseTimeoutMillis;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/rest/SalesforceRestClient.java
 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/rest/SalesforceRestClient.java
new file mode 100644
index 0000000000..26cb6c420a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/rest/SalesforceRestClient.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.rest;
+
+import okhttp3.HttpUrl;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.concurrent.TimeUnit;
+
+public class SalesforceRestClient {
+
+    private static final MediaType JSON_MEDIA_TYPE = 
MediaType.parse("application/json; charset=utf-8");
+
+    private final SalesforceConfiguration configuration;
+    private final OkHttpClient httpClient;
+
+    public SalesforceRestClient(SalesforceConfiguration configuration) {
+        this.configuration = configuration;
+        httpClient = new OkHttpClient.Builder()
+                .readTimeout(configuration.getResponseTimeoutMillis(), 
TimeUnit.MILLISECONDS)
+                .build();
+    }
+
+    public InputStream describeSObject(String sObject) {
+        String url = getUrl("/sobjects/" + sObject + "/describe?maxRecords=1");
+        Request request = buildGetRequest(url);
+        return executeRequest(request);
+    }
+
+    public InputStream query(String query) {
+        HttpUrl httpUrl = HttpUrl.get(getUrl("/query")).newBuilder()
+                .addQueryParameter("q", query)
+                .build();
+        Request request = buildGetRequest(httpUrl.toString());
+        return executeRequest(request);
+    }
+
+    public InputStream getNextRecords(String nextRecordsUrl) {
+        HttpUrl httpUrl = HttpUrl.get(configuration.getInstanceUrl() + 
nextRecordsUrl).newBuilder().build();
+        Request request = buildGetRequest(httpUrl.toString());
+        return executeRequest(request);
+    }
+
+    public void postRecord(String sObjectApiName, String body) {
+        HttpUrl httpUrl = HttpUrl.get(getUrl("/composite/tree/" + 
sObjectApiName)).newBuilder().build();
+        RequestBody requestBody = RequestBody.create(body, JSON_MEDIA_TYPE);
+        Request request = buildPostRequest(httpUrl.toString(), requestBody);
+        executeRequest(request);
+    }
+
+    private InputStream executeRequest(Request request) {
+        Response response = null;
+        try {
+            response = httpClient.newCall(request).execute();
+            if (!response.isSuccessful()) {
+                throw new ProcessException(String.format("Invalid response 
[%s]: %s", response.code(), response.body() == null ? null : 
response.body().string()));
+            }
+            return response.body().byteStream();
+        } catch (IOException e) {
+            if (response != null) {
+                response.close();
+            }
+            throw new UncheckedIOException(String.format("Salesforce HTTP 
request failed [%s]", request.url()), e);
+        }
+    }
+
+    private String getUrl(String path) {
+        return getVersionedBaseUrl() + path;
+    }
+
+    public String getVersionedBaseUrl() {
+        return configuration.getInstanceUrl() + "/services/data/v" + 
configuration.getVersion();
+    }
+
+    private Request buildGetRequest(String url) {
+        return new Request.Builder()
+                .addHeader("Authorization", "Bearer " + 
configuration.getAccessTokenProvider().get())
+                .url(url)
+                .get()
+                .build();
+    }
+
+    private Request buildPostRequest(String url, RequestBody requestBody) {
+        return new Request.Builder()
+                .addHeader("Authorization", "Bearer " + 
configuration.getAccessTokenProvider().get())
+                .url(url)
+                .post(requestBody)
+                .build();
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/schema/SalesforceSchemaHolder.java
 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/schema/SalesforceSchemaHolder.java
new file mode 100644
index 0000000000..5d466a62dd
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/schema/SalesforceSchemaHolder.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.schema;
+
+import org.apache.camel.component.salesforce.api.dto.SObjectDescription;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class SalesforceSchemaHolder {
+
+    RecordSchema querySObjectResultSchema;
+    RecordSchema recordSchema;
+    SObjectDescription salesforceObject;
+
+    public SalesforceSchemaHolder(RecordSchema querySObjectResultSchema, 
RecordSchema recordSchema, SObjectDescription salesforceObject) {
+        this.querySObjectResultSchema = querySObjectResultSchema;
+        this.recordSchema = recordSchema;
+        this.salesforceObject = salesforceObject;
+    }
+
+    public SObjectDescription getSalesforceObject() {
+        return salesforceObject;
+    }
+
+    public RecordSchema getQuerySObjectResultSchema() {
+        return querySObjectResultSchema;
+    }
+
+    public RecordSchema getRecordSchema() {
+        return recordSchema;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/schema/SalesforceToRecordSchemaConverter.java
 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/schema/SalesforceToRecordSchemaConverter.java
new file mode 100644
index 0000000000..3e0d82d147
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/schema/SalesforceToRecordSchemaConverter.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.schema;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.component.salesforce.api.dto.SObjectDescription;
+import org.apache.camel.component.salesforce.api.dto.SObjectField;
+import org.apache.camel.component.salesforce.api.utils.JsonUtils;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class SalesforceToRecordSchemaConverter {
+    private static final ObjectMapper OBJECT_MAPPER = 
JsonUtils.createObjectMapper();
+    private final String dateFormat;
+    private final String dateTimeFormat;
+    private final String timeFormat;
+
+    public SalesforceToRecordSchemaConverter(String dateFormat, String 
dateTimeFormat, String timeFormat) {
+        this.dateFormat = dateFormat;
+        this.dateTimeFormat = dateTimeFormat;
+        this.timeFormat = timeFormat;
+    }
+
+    public SObjectDescription getSalesforceObject(InputStream 
salesforceObjectResultJsonString) throws IOException {
+        return OBJECT_MAPPER.readValue(salesforceObjectResultJsonString, 
SObjectDescription.class);
+    }
+
+    public RecordSchema convertSchema(SObjectDescription salesforceObject, 
String fieldNamesOfInterest) {
+        List<SObjectField> fields = salesforceObject.getFields();
+        if (StringUtils.isNotBlank(fieldNamesOfInterest)) {
+            fields = filterFieldsOfInterest(fields, fieldNamesOfInterest);
+        }
+        List<RecordField> recordFields = null;
+        try {
+            recordFields = convertSObjectFieldsToRecordFields(fields);
+        } catch (IllegalArgumentException e) {
+            throw new ProcessException(String.format("Could not determine 
schema for '%s'", salesforceObject.getName()), e);
+        }
+
+        return new SimpleRecordSchema(recordFields);
+    }
+
+    private List<RecordField> 
convertSObjectFieldsToRecordFields(List<SObjectField> fields) {
+        List<RecordField> recordFields = new ArrayList<>();
+        for (SObjectField field : fields) {
+            recordFields.add(convertSObjectFieldToRecordField(field));
+        }
+        return recordFields;
+    }
+
+    private List<SObjectField> filterFieldsOfInterest(List<SObjectField> 
fields, String fieldNamesOfInterest) {
+        List<String> listOfFieldNamesOfInterest = 
Arrays.asList(fieldNamesOfInterest.toLowerCase().split("\\s*,\\s*"));
+        return fields
+                .stream()
+                .filter(sObjectField -> 
listOfFieldNamesOfInterest.contains(sObjectField.getName().toLowerCase()))
+                .collect(Collectors.toList());
+    }
+
+    private RecordField convertSObjectFieldToRecordField(SObjectField field) {
+        String soapType = field.getSoapType();
+        DataType dataType;
+        switch (soapType.substring(soapType.indexOf(':') + 1)) {
+            case "ID":
+            case "string":
+            case "json":
+            case "base64Binary":
+            case "anyType":
+                dataType = RecordFieldType.STRING.getDataType();
+                break;
+            case "int":
+                dataType = RecordFieldType.INT.getDataType();
+                break;
+            case "long":
+                dataType = RecordFieldType.LONG.getDataType();
+                break;
+            case "double":
+                dataType = RecordFieldType.DOUBLE.getDataType();
+                break;
+            case "boolean":
+                dataType = RecordFieldType.BOOLEAN.getDataType();
+                break;
+            case "date":
+                dataType = RecordFieldType.DATE.getDataType(dateFormat);
+                break;
+            case "dateTime":
+                dataType = 
RecordFieldType.TIMESTAMP.getDataType(dateTimeFormat);
+                break;
+            case "time":
+                dataType = RecordFieldType.TIME.getDataType(timeFormat);
+                break;
+            case "address":
+                dataType = 
RecordFieldType.RECORD.getRecordDataType(createAddressSchema());
+                break;
+            case "location":
+                dataType = 
RecordFieldType.RECORD.getRecordDataType(createLocationSchema());
+                break;
+            default:
+                throw new IllegalArgumentException(String.format("Could not 
convert field '%s' of soap type '%s'.", field.getName(), soapType));
+        }
+        return new RecordField(field.getName(), dataType, 
field.getDefaultValue(), field.isNillable());
+    }
+
+    private RecordSchema createAddressSchema() {
+        return new SimpleRecordSchema(Arrays.asList(
+                new RecordField("city", RecordFieldType.STRING.getDataType(), 
true),
+                new RecordField("country", 
RecordFieldType.STRING.getDataType(), true),
+                new RecordField("countryCode", 
RecordFieldType.STRING.getDataType(), true),
+                new RecordField("postalCode", 
RecordFieldType.STRING.getDataType(), true),
+                new RecordField("state", RecordFieldType.STRING.getDataType(), 
true),
+                new RecordField("stateCode", 
RecordFieldType.STRING.getDataType(), true),
+                new RecordField("street", 
RecordFieldType.STRING.getDataType(), true),
+                new RecordField("geocodeAccuracy", 
RecordFieldType.STRING.getDataType(), true)
+        ));
+    }
+
+    private RecordSchema createLocationSchema() {
+        return new SimpleRecordSchema(Arrays.asList(
+                new RecordField("latitude", 
RecordFieldType.STRING.getDataType(), true),
+                new RecordField("longitude", 
RecordFieldType.STRING.getDataType(), true)
+        ));
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/CommonSalesforceProperties.java
 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/CommonSalesforceProperties.java
index 39188396d9..c8f6078cd8 100644
--- 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/CommonSalesforceProperties.java
+++ 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/CommonSalesforceProperties.java
@@ -23,10 +23,10 @@ import org.apache.nifi.processor.util.StandardValidators;
 
 public final class CommonSalesforceProperties {
 
-    public static final PropertyDescriptor API_URL = new 
PropertyDescriptor.Builder()
+    public static final PropertyDescriptor SALESFORCE_INSTANCE_URL = new 
PropertyDescriptor.Builder()
             .name("salesforce-url")
-            .displayName("URL")
-            .description("The URL for the Salesforce REST API including the 
domain without additional path information, such as 
https://MyDomainName.my.salesforce.com";)
+            .displayName("Salesforce Instance URL")
+            .description("The URL of the Salesforce instance including the 
domain without additional path information, such as 
https://MyDomainName.my.salesforce.com";)
             .required(true)
             .addValidator(StandardValidators.URL_VALIDATOR)
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
diff --git 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/IncrementalContext.java
 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/IncrementalContext.java
new file mode 100644
index 0000000000..4b155a042b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/IncrementalContext.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.salesforce.QuerySalesforceObject;
+
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+public class IncrementalContext {
+
+    private final String ageField;
+    private final String initialAgeFilter;
+    private final String ageFilterUpper;
+    private final String ageFilterLower;
+
+    public IncrementalContext(ProcessContext context, StateMap state) {
+        ageField = 
context.getProperty(QuerySalesforceObject.AGE_FIELD).getValue();
+        initialAgeFilter = 
context.getProperty(QuerySalesforceObject.INITIAL_AGE_FILTER).getValue();
+        ageFilterLower = state.get(QuerySalesforceObject.LAST_AGE_FILTER);
+        Optional<Long> ageDelayMs = 
Optional.ofNullable(context.getProperty(QuerySalesforceObject.AGE_DELAY).asTimePeriod(TimeUnit.MILLISECONDS));
+
+        if (ageField == null) {
+            ageFilterUpper = null;
+        } else {
+            OffsetDateTime ageFilterUpperTime = ageDelayMs
+                    .map(delay -> OffsetDateTime.now().minus(delay, 
ChronoUnit.MILLIS))
+                    .orElse(OffsetDateTime.now());
+            ageFilterUpper = 
ageFilterUpperTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
+        }
+    }
+
+    public String getAgeField() {
+        return ageField;
+    }
+
+    public String getInitialAgeFilter() {
+        return initialAgeFilter;
+    }
+
+    public String getAgeFilterUpper() {
+        return ageFilterUpper;
+    }
+
+    public String getAgeFilterLower() {
+        return ageFilterLower;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceQueryBuilder.java
 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceQueryBuilder.java
new file mode 100644
index 0000000000..115c1c3aff
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceQueryBuilder.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SalesforceQueryBuilder {
+
+    private final IncrementalContext incrementalContext;
+
+    public SalesforceQueryBuilder(IncrementalContext incrementalContext) {
+        this.incrementalContext = incrementalContext;
+    }
+
+    public String buildQuery(String sObject, String fields, String 
customWhereClause) {
+        StringBuilder queryBuilder = new StringBuilder("SELECT ")
+                .append(fields)
+                .append(" FROM ")
+                .append(sObject);
+
+        List<String> whereItems = new ArrayList<>();
+        if (customWhereClause != null) {
+            whereItems.add("( " + customWhereClause + " )");
+        }
+
+        addAgeFilter(whereItems);
+
+        if (!whereItems.isEmpty()) {
+            String finalWhereClause = String.join(" AND ", whereItems);
+            queryBuilder.append(" WHERE ").append(finalWhereClause);
+        }
+
+        return queryBuilder.toString();
+    }
+
+    private void addAgeFilter(List<String> whereItems) {
+        String ageField = incrementalContext.getAgeField();
+        String ageFilterLower = incrementalContext.getAgeFilterLower();
+        String initialAgeFilter = incrementalContext.getInitialAgeFilter();
+
+        if (ageField != null) {
+            if (ageFilterLower != null) {
+                whereItems.add(ageField + " >= " + ageFilterLower);
+            } else if (initialAgeFilter != null) {
+                whereItems.add(ageField + " >= " + initialAgeFilter);
+            }
+
+            whereItems.add(ageField + " < " + 
incrementalContext.getAgeFilterUpper());
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java
 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java
deleted file mode 100644
index 0affbc441e..0000000000
--- 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.salesforce.util;
-
-import okhttp3.HttpUrl;
-import okhttp3.MediaType;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import okhttp3.RequestBody;
-import okhttp3.Response;
-import org.apache.nifi.processor.exception.ProcessException;
-
-import java.io.InputStream;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-
-public class SalesforceRestService {
-    private final String version;
-    private final String baseUrl;
-    private final Supplier<String> accessTokenProvider;
-    private final OkHttpClient httpClient;
-
-    public SalesforceRestService(String version, String baseUrl, 
Supplier<String> accessTokenProvider, int responseTimeoutMillis) {
-        this.version = version;
-        this.baseUrl = baseUrl;
-        this.accessTokenProvider = accessTokenProvider;
-        httpClient = new OkHttpClient.Builder()
-                .readTimeout(responseTimeoutMillis, TimeUnit.MILLISECONDS)
-                .build();
-    }
-
-    public InputStream describeSObject(String sObject) {
-        String url = getVersionedBaseUrl() + "/sobjects/" + sObject + 
"/describe?maxRecords=1";
-
-        Request request = new Request.Builder()
-                .addHeader("Authorization", "Bearer " + 
accessTokenProvider.get())
-                .url(url)
-                .get()
-                .build();
-
-        return request(request);
-    }
-
-    public InputStream query(String query) {
-        String url = getVersionedBaseUrl() + "/query";
-
-        HttpUrl httpUrl = HttpUrl.get(url).newBuilder()
-                .addQueryParameter("q", query)
-                .build();
-
-        Request request = new Request.Builder()
-                .addHeader("Authorization", "Bearer " + 
accessTokenProvider.get())
-                .url(httpUrl)
-                .get()
-                .build();
-
-        return request(request);
-    }
-
-    public InputStream getNextRecords(String nextRecordsUrl) {
-        String url = baseUrl + nextRecordsUrl;
-
-        HttpUrl httpUrl = HttpUrl.get(url).newBuilder()
-                .build();
-
-        Request request = new Request.Builder()
-                .addHeader("Authorization", "Bearer " + 
accessTokenProvider.get())
-                .url(httpUrl)
-                .get()
-                .build();
-
-        return request(request);
-    }
-
-    public InputStream postRecord(String sObjectApiName, String body) {
-        String url = getVersionedBaseUrl() + "/composite/tree/" + 
sObjectApiName;
-
-        HttpUrl httpUrl = HttpUrl.get(url).newBuilder()
-                .build();
-
-        final RequestBody requestBody = RequestBody.create(body, 
MediaType.parse("application/json"));
-
-        Request request = new Request.Builder()
-                .addHeader("Authorization", "Bearer " + 
accessTokenProvider.get())
-                .url(httpUrl)
-                .post(requestBody)
-                .build();
-
-        return request(request);
-    }
-
-    public String getVersionedBaseUrl() {
-        return baseUrl + "/services/data/v" + version;
-    }
-
-    private InputStream request(Request request) {
-        Response response = null;
-        try {
-            response = httpClient.newCall(request).execute();
-            if (response.code() < 200 || response.code() > 201) {
-                throw new ProcessException("Invalid response" +
-                        " Code: " + response.code() +
-                        " Message: " + response.message() +
-                        " Body: " + (response.body() == null ? null : 
response.body().string())
-                );
-            }
-            return response.body().byteStream();
-        } catch (ProcessException e) {
-            if (response != null) {
-                response.close();
-            }
-            throw e;
-        } catch (Exception e) {
-            if (response != null) {
-                response.close();
-            }
-            throw new ProcessException(String.format("Salesforce HTTP request 
failed [%s]", request.url()), e);
-        }
-    }
-}
diff --git 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverter.java
 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverter.java
deleted file mode 100644
index 5bc6637161..0000000000
--- 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverter.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.salesforce.util;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.camel.component.salesforce.api.dto.SObjectDescription;
-import org.apache.camel.component.salesforce.api.dto.SObjectField;
-import org.apache.camel.component.salesforce.api.utils.JsonUtils;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.util.StringUtils;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class SalesforceToRecordSchemaConverter {
-    private static final ObjectMapper OBJECT_MAPPER = 
JsonUtils.createObjectMapper();
-    private final String dateFormat;
-    private final String dateTimeFormat;
-    private final String timeFormat;
-
-    public SalesforceToRecordSchemaConverter(String dateFormat, String 
dateTimeFormat, String timeFormat) {
-        this.dateFormat = dateFormat;
-        this.dateTimeFormat = dateTimeFormat;
-        this.timeFormat = timeFormat;
-    }
-
-    public SObjectDescription getSalesforceObject(InputStream 
salesforceObjectResultJsonString) throws IOException {
-        return OBJECT_MAPPER.readValue(salesforceObjectResultJsonString, 
SObjectDescription.class);
-    }
-
-    public RecordSchema convertSchema(SObjectDescription salesforceObject, 
String fieldNamesOfInterest) {
-        List<SObjectField> fields = salesforceObject.getFields();
-        if (StringUtils.isNotBlank(fieldNamesOfInterest)) {
-            final List<String> listOfFieldNamesOfInterest = 
Arrays.asList(fieldNamesOfInterest.toLowerCase().split("\\s*,\\s*"));
-            fields = fields
-                    .stream()
-                    .filter(sObjectField -> 
listOfFieldNamesOfInterest.contains(sObjectField.getName().toLowerCase()))
-                    .collect(Collectors.toList());
-        }
-
-        final List<RecordField> recordFields = new ArrayList<>();
-
-        for (SObjectField field : fields) {
-            final String soapType = field.getSoapType();
-
-            switch (soapType.substring(soapType.indexOf(':') + 1)) {
-                case "ID":
-                case "string":
-                case "json":
-                case "base64Binary":
-                case "anyType":
-                    recordFields.add(new RecordField(field.getName(), 
RecordFieldType.STRING.getDataType(), field.getDefaultValue(), 
field.isNillable()));
-                    break;
-                case "int":
-                    recordFields.add(new RecordField(field.getName(), 
RecordFieldType.INT.getDataType(), field.getDefaultValue(), 
field.isNillable()));
-                    break;
-                case "long":
-                    recordFields.add(new RecordField(field.getName(), 
RecordFieldType.LONG.getDataType(), field.getDefaultValue(), 
field.isNillable()));
-                    break;
-                case "double":
-                    recordFields.add(new RecordField(field.getName(), 
RecordFieldType.DOUBLE.getDataType(), field.getDefaultValue(), 
field.isNillable()));
-                    break;
-                case "boolean":
-                    recordFields.add(new RecordField(field.getName(), 
RecordFieldType.BOOLEAN.getDataType(), field.getDefaultValue(), 
field.isNillable()));
-                    break;
-                case "date":
-                    recordFields.add(new RecordField(field.getName(), 
RecordFieldType.DATE.getDataType(dateFormat), field.getDefaultValue(), 
field.isNillable()));
-                    break;
-                case "dateTime":
-                    recordFields.add(new RecordField(field.getName(), 
RecordFieldType.TIMESTAMP.getDataType(dateTimeFormat), field.getDefaultValue(), 
field.isNillable()));
-                    break;
-                case "time":
-                    recordFields.add(new RecordField(field.getName(), 
RecordFieldType.TIME.getDataType(timeFormat), field.getDefaultValue(), 
field.isNillable()));
-                    break;
-                case "address":
-                    final RecordSchema addressSchema = new 
SimpleRecordSchema(Arrays.asList(
-                            new RecordField("city", 
RecordFieldType.STRING.getDataType(), true),
-                            new RecordField("country", 
RecordFieldType.STRING.getDataType(), true),
-                            new RecordField("countryCode", 
RecordFieldType.STRING.getDataType(), true),
-                            new RecordField("postalCode", 
RecordFieldType.STRING.getDataType(), true),
-                            new RecordField("state", 
RecordFieldType.STRING.getDataType(), true),
-                            new RecordField("stateCode", 
RecordFieldType.STRING.getDataType(), true),
-                            new RecordField("street", 
RecordFieldType.STRING.getDataType(), true),
-                            new RecordField("geocodeAccuracy", 
RecordFieldType.STRING.getDataType(), true)
-                    ));
-                    recordFields.add(new RecordField(field.getName(), 
RecordFieldType.RECORD.getRecordDataType(addressSchema), 
field.getDefaultValue(), field.isNillable()));
-                    break;
-                case "location":
-                    final RecordSchema locationSchema = new 
SimpleRecordSchema(Arrays.asList(
-                            new RecordField("latitude", 
RecordFieldType.STRING.getDataType(), true),
-                            new RecordField("longitude", 
RecordFieldType.STRING.getDataType(), true)
-                    ));
-                    recordFields.add(new RecordField(field.getName(), 
RecordFieldType.RECORD.getRecordDataType(locationSchema), 
field.getDefaultValue(), field.isNillable()));
-                    break;
-                default:
-                    throw new IllegalArgumentException(String.format("Could 
not determine schema for '%s'. Could not convert field '%s' of soap type '%s'.",
-                            salesforceObject.getName(), field.getName(), 
soapType));
-            }
-        }
-
-        return new SimpleRecordSchema(recordFields);
-    }
-}
diff --git 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/validator/SalesforceAgeValidator.java
 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/validator/SalesforceAgeValidator.java
new file mode 100644
index 0000000000..900304c768
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/validator/SalesforceAgeValidator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.validator;
+
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+
+import java.util.List;
+
+import static 
org.apache.nifi.processors.salesforce.QuerySalesforceObject.AGE_FIELD;
+import static 
org.apache.nifi.processors.salesforce.QuerySalesforceObject.INITIAL_AGE_FILTER;
+
+public final class SalesforceAgeValidator {
+
+    private SalesforceAgeValidator() {
+    }
+
+    public static List<ValidationResult> validate(ValidationContext 
validationContext, List<ValidationResult> results) {
+        if (validationContext.getProperty(INITIAL_AGE_FILTER).isSet() && 
!validationContext.getProperty(AGE_FIELD).isSet()) {
+            results.add(
+                    new ValidationResult.Builder()
+                            .subject(INITIAL_AGE_FILTER.getDisplayName())
+                            .valid(false)
+                            .explanation("it requires " + 
AGE_FIELD.getDisplayName() + " also to be set.")
+                            .build()
+            );
+        }
+        return results;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceObjectIT.java
 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceObjectIT.java
index 2c296a119f..96b37d5a35 100644
--- 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceObjectIT.java
+++ 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceObjectIT.java
@@ -99,8 +99,8 @@ class PutSalesforceObjectIT implements SalesforceConfigAware {
         assertTrue(runner.getProvenanceEvents().isEmpty());
 
         List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutSalesforceObject.REL_FAILURE);
-        MockFlowFile ff0 = flowFiles.get(0);
-        ff0.assertAttributeExists("error.message");
+        MockFlowFile ff = flowFiles.get(0);
+        ff.assertAttributeExists("error.message");
     }
 
     @Test
@@ -120,8 +120,8 @@ class PutSalesforceObjectIT implements 
SalesforceConfigAware {
         assertTrue(runner.getProvenanceEvents().isEmpty());
 
         List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutSalesforceObject.REL_FAILURE);
-        MockFlowFile ff0 = flowFiles.get(0);
-        ff0.assertAttributeExists("error.message");
+        MockFlowFile ff = flowFiles.get(0);
+        ff.assertAttributeExists("error.message");
     }
 
     private void configureProcessor(final MockRecordParser reader) throws 
InitializationException {
@@ -129,7 +129,7 @@ class PutSalesforceObjectIT implements 
SalesforceConfigAware {
         runner.enableControllerService(reader);
 
         runner.setProperty(CommonSalesforceProperties.API_VERSION, VERSION);
-        runner.setProperty(CommonSalesforceProperties.API_URL, BASE_URL);
+        runner.setProperty(CommonSalesforceProperties.SALESFORCE_INSTANCE_URL, 
INSTANCE_URL);
         runner.setProperty(PutSalesforceObject.RECORD_READER_FACTORY, 
reader.getIdentifier());
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java
 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java
index 6c91c728ff..4da9c137e4 100644
--- 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java
+++ 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java
@@ -68,7 +68,7 @@ class QuerySalesforceObjectIT implements 
SalesforceConfigAware {
         runner.setProperty(QuerySalesforceObject.SOBJECT_NAME, sObjectName);
         runner.setProperty(QuerySalesforceObject.FIELD_NAMES, fieldNames);
         runner.setProperty(CommonSalesforceProperties.API_VERSION, VERSION);
-        runner.setProperty(CommonSalesforceProperties.API_URL, BASE_URL);
+        runner.setProperty(CommonSalesforceProperties.SALESFORCE_INSTANCE_URL, 
INSTANCE_URL);
         runner.setProperty(QuerySalesforceObject.RECORD_WRITER, 
writer.getIdentifier());
         runner.setProperty(QuerySalesforceObject.AGE_FIELD, "CreatedDate");
         runner.setProperty(QuerySalesforceObject.INITIAL_AGE_FILTER, 
"2022-01-06T08:43:24.000+0000");
@@ -83,12 +83,12 @@ class QuerySalesforceObjectIT implements 
SalesforceConfigAware {
 
     @Test
     void runCustomQuery() {
-        String customQuery = "SELECT Id, Name, AccountId, 
Account.ShippingAddress FROM Contact";
+        String customQuery = "SELECT Id, Name FROM Account";
 
         runner.setProperty(QuerySalesforceObject.QUERY_TYPE, 
QuerySalesforceObject.CUSTOM_QUERY);
         runner.setProperty(QuerySalesforceObject.CUSTOM_SOQL_QUERY, 
customQuery);
         runner.setProperty(CommonSalesforceProperties.API_VERSION, VERSION);
-        runner.setProperty(CommonSalesforceProperties.API_URL, BASE_URL);
+        runner.setProperty(CommonSalesforceProperties.SALESFORCE_INSTANCE_URL, 
INSTANCE_URL);
 
         runner.run();
 
diff --git 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceConfigAware.java
 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceConfigAware.java
index c5b73e5897..98adffa39f 100644
--- 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceConfigAware.java
+++ 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceConfigAware.java
@@ -31,9 +31,9 @@ import org.apache.nifi.util.TestRunner;
  */
 public interface SalesforceConfigAware {
     String VERSION = "54.0";
-    String BASE_URL = "https://MyDomainName.my.salesforce.com";;
+    String INSTANCE_URL = "https://MyDomainName.my.salesforce.com";;
 
-    String AUTHORIZATION_SERVER_URL = BASE_URL + "/services/oauth2/token";
+    String AUTHORIZATION_SERVER_URL = INSTANCE_URL + "/services/oauth2/token";
     String USERNAME = "???";
     String PASSWORD = "???";
     String CLIENT_ID = "???";
diff --git 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceRestServiceIT.java
 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceRestServiceIT.java
index a82b9ac863..abbb79282e 100644
--- 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceRestServiceIT.java
+++ 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceRestServiceIT.java
@@ -21,6 +21,8 @@ import 
org.apache.nifi.processor.AbstractSessionFactoryProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.salesforce.rest.SalesforceConfiguration;
+import org.apache.nifi.processors.salesforce.rest.SalesforceRestClient;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.AfterEach;
@@ -37,7 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
  */
 class SalesforceRestServiceIT implements SalesforceConfigAware {
     private TestRunner runner;
-    private SalesforceRestService testSubject;
+    private SalesforceRestClient testSubject;
 
     @BeforeEach
     void setUp() throws Exception {
@@ -48,13 +50,9 @@ class SalesforceRestServiceIT implements 
SalesforceConfigAware {
         });
 
         StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = 
initOAuth2AccessTokenProvider(runner);
-
-        testSubject = new SalesforceRestService(
-                VERSION,
-                BASE_URL,
-                () -> 
oauth2AccessTokenProvider.getAccessDetails().getAccessToken(),
-                5_000
-        );
+        SalesforceConfiguration configuration = 
SalesforceConfiguration.create(INSTANCE_URL, VERSION,
+                () -> 
oauth2AccessTokenProvider.getAccessDetails().getAccessToken(), 5_000);
+        testSubject = new SalesforceRestClient(configuration);
     }
 
     @AfterEach
diff --git 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java
 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java
index 408bd62394..f971beec4d 100644
--- 
a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java
+++ 
b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java
@@ -18,6 +18,8 @@ package org.apache.nifi.processors.salesforce.util;
 
 import com.fasterxml.jackson.databind.exc.MismatchedInputException;
 import org.apache.camel.component.salesforce.api.dto.SObjectDescription;
+import org.apache.nifi.processor.exception.ProcessException;
+import 
org.apache.nifi.processors.salesforce.schema.SalesforceToRecordSchemaConverter;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
@@ -174,9 +176,7 @@ class SalesforceToRecordSchemaConverterTest {
         try (final InputStream sfSchema = readFile(TEST_PATH + 
"unknown_type_sf_schema.json")) {
             final String fieldNames = "FieldWithUnknownType";
             final SObjectDescription salesforceObject = 
converter.getSalesforceObject(sfSchema);
-            final IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
converter.convertSchema(salesforceObject, fieldNames));
-            final String errorMessage = "Could not determine schema for 
'SObjectWithUnknownFieldType'. Could not convert field 'FieldWithUnknownType' 
of soap type 'xsd:unknown'.";
-            assertEquals(errorMessage, exception.getMessage());
+            assertThrows(ProcessException.class, () -> 
converter.convertSchema(salesforceObject, fieldNames));
         }
     }
 

Reply via email to