turcsanyip commented on code in PR #6379:
URL: https://github.com/apache/nifi/pull/6379#discussion_r979460593


##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,78 +257,158 @@ private String getResponseBodyAsString(final 
ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, 
String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger 
objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext 
context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> 
stateMap) {
         return out -> {
             try (final JsonParser jsonParser = 
JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = 
JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = 
context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, 
objectType);

Review Comment:
   Similar to the `time_window_*` state attributes, please remove the object 
type suffix here too.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,78 +257,158 @@ private String getResponseBodyAsString(final 
ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, 
String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger 
objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext 
context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> 
stateMap) {
         return out -> {
             try (final JsonParser jsonParser = 
JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = 
JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = 
context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, 
objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new 
HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, 
final URI uri) {
-        return webClientServiceProvider.getWebClientService()
-                .get()
-                .uri(uri)
-                .header("Authorization", "Bearer " + accessToken)
-                .retrieve();
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, 
final URI uri, final String filters) {
+        final InputStream inputStream = IOUtils.toInputStream(filters, 
StandardCharsets.UTF_8);
+        try {
+            return webClientServiceProvider.getWebClientService()
+                    .post()
+                    .uri(uri)
+                    .header("Authorization", "Bearer " + accessToken)
+                    .header("Content-Type", "application/json")
+                    .body(inputStream, 
OptionalLong.of(inputStream.available()))
+                    .retrieve();
+        } catch (IOException e) {
+            throw new ProcessException("Could not transform incremental 
filters to input stream", e);
+        }
     }
 
-    private URI createUri(final ProcessContext context, final StateMap state) {
-        final String path = context.getProperty(OBJECT_TYPE).getValue();
-        final HttpUriBuilder uriBuilder = getBaseUri(context);
+    private String createIncrementalFilters(final ProcessContext context, 
final Map<String, String> stateMap) {
+        final String limit = context.getProperty(RESULT_LIMIT).getValue();
+        final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+        final HubSpotObjectType hubSpotObjectType = 
OBJECT_TYPE_LOOKUP_MAP.get(objectType);
+        final Long incrDelayMs = 
context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
 
-        final boolean isLimitSet = 
context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet();
-        if (isLimitSet) {
-            final String limit = context.getProperty(RESULT_LIMIT).getValue();
-            uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit);
+        final ObjectNode root = OBJECT_MAPPER.createObjectNode();
+        root.put("limit", limit);
+
+        final String cursor = stateMap.get(cursorKey);
+        if (cursor != null && !NO_PAGING.equals(cursor)) {
+            root.put(PAGING_CURSOR, stateMap.get(cursorKey));
         }
+        final boolean isIncremental = 
context.getProperty(IS_INCREMENTAL).asBoolean();
+        if (isIncremental) {
+            final String initialStartTimeValue = 
context.getProperty(INCREMENTAL_INITIAL_START_TIME).getValue();
+            String initialStartTimeEpoch = 
getInitialStartTimeEpoch(initialStartTimeValue);
+            final String hubspotSpecificIncrementalFieldName = 
hubSpotObjectType.getLastModifiedDateType().getValue();
+            final String lastStartTime = stateMap.get(START_INCREMENTAL_KEY);
+            final String lastEndTime = 
stateMap.getOrDefault(END_INCREMENTAL_KEY, initialStartTimeEpoch);
+
+            final String currentStartTime;
+            final String currentEndTime;
+
+            if (cursor != null && !NO_PAGING.equals(cursor)) {
+                currentStartTime = lastStartTime;
+                // lastEndTime can be null if incremental loading was turned 
off beforehand
+                currentEndTime = lastEndTime != null ? lastEndTime : 
String.valueOf(getCurrentEpochTime());
+            } else {
+                currentStartTime = lastEndTime;
+                final long delayedCurrentEndTime = incrDelayMs != null ? 
getCurrentEpochTime() - incrDelayMs : getCurrentEpochTime();
+                currentEndTime = String.valueOf(delayedCurrentEndTime);
+
+                stateMap.put(START_INCREMENTAL_KEY, currentStartTime);
+                stateMap.put(END_INCREMENTAL_KEY, currentEndTime);
+            }
+
+            final ArrayNode filters = OBJECT_MAPPER.createArrayNode();
 
-        final String cursor = state.get(path);
-        if (cursor != null) {
-            uriBuilder.addQueryParameter(CURSOR_PARAMETER, cursor);
+            if (currentStartTime != null) {
+                final ObjectNode greaterThanFilterNode = 
OBJECT_MAPPER.createObjectNode();
+                greaterThanFilterNode.put("propertyName", 
hubspotSpecificIncrementalFieldName);
+                greaterThanFilterNode.put("operator", "GTE");
+                greaterThanFilterNode.put("value", currentStartTime);
+                filters.add(greaterThanFilterNode);
+            }
+
+            final ObjectNode lessThanFilterNode = 
OBJECT_MAPPER.createObjectNode();
+            lessThanFilterNode.put("propertyName", 
hubspotSpecificIncrementalFieldName);
+            lessThanFilterNode.put("operator", "LT");
+            lessThanFilterNode.put("value", currentEndTime);
+            filters.add(lessThanFilterNode);
+
+            root.set("filters", filters);
         }
-        return uriBuilder.build();
+        return root.toString();
     }
 
-    private StateMap getStateMap(final ProcessContext context) {
+    private String getInitialStartTimeEpoch(String initialStartTimeValue) {
+        if (initialStartTimeValue != null) {
+            final TemporalAccessor initialDateTime = 
DateTimeFormatter.ISO_DATE_TIME.parse(initialStartTimeValue);
+            return 
String.valueOf(Instant.from(initialDateTime).toEpochMilli());
+        }
+        return null;
+    }
+
+    long getCurrentEpochTime() {
+        return Instant.now().toEpochMilli();
+    }
+
+    private Map<String, String> getStateMap(final ProcessSession session) {
         final StateMap stateMap;
         try {
-            stateMap = context.getStateManager().getState(Scope.CLUSTER);
+            stateMap = session.getState(Scope.CLUSTER);
         } catch (IOException e) {
             throw new ProcessException("State retrieval failed", e);
         }
-        return stateMap;
+        return new HashMap<>(stateMap.toMap());
     }
 
-    private void updateState(ProcessContext context, Map<String, String> 
newState) {
+    private void updateState(ProcessSession session, Map<String, String> 
newState) {
         try {
-            context.getStateManager().setState(newState, Scope.CLUSTER);
+            session.setState(newState, Scope.CLUSTER);
         } catch (IOException e) {
             throw new ProcessException("Page cursor update failed", e);
         }
     }
+
+    private void clearState(ProcessSession session) {
+        try {
+            session.clearState(Scope.CLUSTER);

Review Comment:
   Not sure why, but it does not remove previous items from the state.
   `ProcessContext.getStateManager().clear()` might be a better option for 
clearing.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,78 +257,158 @@ private String getResponseBodyAsString(final 
ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, 
String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger 
objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext 
context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> 
stateMap) {
         return out -> {
             try (final JsonParser jsonParser = 
JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = 
JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = 
context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, 
objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new 
HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, 
final URI uri) {
-        return webClientServiceProvider.getWebClientService()
-                .get()
-                .uri(uri)
-                .header("Authorization", "Bearer " + accessToken)
-                .retrieve();
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, 
final URI uri, final String filters) {
+        final InputStream inputStream = IOUtils.toInputStream(filters, 
StandardCharsets.UTF_8);
+        try {
+            return webClientServiceProvider.getWebClientService()
+                    .post()
+                    .uri(uri)
+                    .header("Authorization", "Bearer " + accessToken)
+                    .header("Content-Type", "application/json")
+                    .body(inputStream, 
OptionalLong.of(inputStream.available()))
+                    .retrieve();
+        } catch (IOException e) {
+            throw new ProcessException("Could not transform incremental 
filters to input stream", e);
+        }
     }
 
-    private URI createUri(final ProcessContext context, final StateMap state) {
-        final String path = context.getProperty(OBJECT_TYPE).getValue();
-        final HttpUriBuilder uriBuilder = getBaseUri(context);
+    private String createIncrementalFilters(final ProcessContext context, 
final Map<String, String> stateMap) {
+        final String limit = context.getProperty(RESULT_LIMIT).getValue();
+        final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+        final HubSpotObjectType hubSpotObjectType = 
OBJECT_TYPE_LOOKUP_MAP.get(objectType);
+        final Long incrDelayMs = 
context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
 
-        final boolean isLimitSet = 
context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet();
-        if (isLimitSet) {
-            final String limit = context.getProperty(RESULT_LIMIT).getValue();
-            uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit);
+        final ObjectNode root = OBJECT_MAPPER.createObjectNode();
+        root.put("limit", limit);
+
+        final String cursor = stateMap.get(cursorKey);
+        if (cursor != null && !NO_PAGING.equals(cursor)) {
+            root.put(PAGING_CURSOR, stateMap.get(cursorKey));
         }
+        final boolean isIncremental = 
context.getProperty(IS_INCREMENTAL).asBoolean();
+        if (isIncremental) {
+            final String initialStartTimeValue = 
context.getProperty(INCREMENTAL_INITIAL_START_TIME).getValue();
+            String initialStartTimeEpoch = 
getInitialStartTimeEpoch(initialStartTimeValue);
+            final String hubspotSpecificIncrementalFieldName = 
hubSpotObjectType.getLastModifiedDateType().getValue();
+            final String lastStartTime = stateMap.get(START_INCREMENTAL_KEY);
+            final String lastEndTime = 
stateMap.getOrDefault(END_INCREMENTAL_KEY, initialStartTimeEpoch);
+
+            final String currentStartTime;
+            final String currentEndTime;
+
+            if (cursor != null && !NO_PAGING.equals(cursor)) {
+                currentStartTime = lastStartTime;
+                // lastEndTime can be null if incremental loading was turned 
off beforehand
+                currentEndTime = lastEndTime != null ? lastEndTime : 
String.valueOf(getCurrentEpochTime());

Review Comment:
   I think it is better to avoid this situation and the state should also be 
reset when the user changes `Incremental Loading` (similar to` Object Type`).
   In that case `currentEndTime = lastEndTime` would be enough because 
`lastEndTime` must have a value when the paging cursor is set.
   Otherwise `Incremental Delay` should be applied here too.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -99,7 +106,43 @@ public class GetHubSpot extends AbstractProcessor {
             .description("The maximum number of results to request for each 
invocation of the Processor")
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .required(false)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .addValidator(StandardValidators.createLongValidator(1, 100, true))
+            .build();
+
+    static final PropertyDescriptor IS_INCREMENTAL = new 
PropertyDescriptor.Builder()
+            .name("is-incremental")
+            .displayName("Incremental Loading")
+            .description("The processor can incrementally load the queried 
objects so that each object is queried exactly once." +
+                    " For each query, the processor queries objects within a 
time window where the objects were modified between" +
+                    " the previous run time and the current time (optionally 
adjusted by the Incremental Delay property).")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    static final PropertyDescriptor INCREMENTAL_DELAY = new 
PropertyDescriptor.Builder()
+            .name("incremental-delay")
+            .displayName("Incremental Delay")
+            .description(("The ending timestamp of the time window will be 
adjusted earlier by the amount configured in this property." +
+                    " For example, with a property value of 10 seconds, an 
ending timestamp of 12:30:45 would be changed to 12:30:35." +
+                    " Set this property to avoid missing objects when the 
clock of your local machines and HubSpot servers' clock are not in sync."))
+            .required(true)
+            .defaultValue("3 sec")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .dependsOn(IS_INCREMENTAL, "true")
+            .build();
+
+    static final PropertyDescriptor INCREMENTAL_INITIAL_START_TIME = new 
PropertyDescriptor.Builder()
+            .name("incremental-initial-start-time")
+            .displayName("Incremental Initial Start Time")
+            .description("This property specifies the start time as Epoch Time 
that the processor applies when running the first request." +
+                    " The expected format is an ISO-like date-time with the 
offset and zone if available, such as '2011-12-03T10:15:30'," +
+                    " '2011-12-03T10:15:30+01:00' or 
'2011-12-03T10:15:30+01:00[Europe/Paris]'")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)

Review Comment:
   It is not possible to enter datetime due to the `Long` validator.
   `ISO8601_INSTANT_VALIDATOR` could be used. It accepts only UTC time (e.g. 
`2011-12-03T10:15:30Z`), so the examples in the description need to be adjusted 
in that case.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,78 +257,158 @@ private String getResponseBodyAsString(final 
ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, 
String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger 
objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext 
context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> 
stateMap) {
         return out -> {
             try (final JsonParser jsonParser = 
JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = 
JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = 
context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, 
objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new 
HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, 
final URI uri) {
-        return webClientServiceProvider.getWebClientService()
-                .get()
-                .uri(uri)
-                .header("Authorization", "Bearer " + accessToken)
-                .retrieve();
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, 
final URI uri, final String filters) {
+        final InputStream inputStream = IOUtils.toInputStream(filters, 
StandardCharsets.UTF_8);
+        try {
+            return webClientServiceProvider.getWebClientService()
+                    .post()
+                    .uri(uri)
+                    .header("Authorization", "Bearer " + accessToken)
+                    .header("Content-Type", "application/json")
+                    .body(inputStream, 
OptionalLong.of(inputStream.available()))
+                    .retrieve();
+        } catch (IOException e) {
+            throw new ProcessException("Could not transform incremental 
filters to input stream", e);
+        }
     }
 
-    private URI createUri(final ProcessContext context, final StateMap state) {
-        final String path = context.getProperty(OBJECT_TYPE).getValue();
-        final HttpUriBuilder uriBuilder = getBaseUri(context);
+    private String createIncrementalFilters(final ProcessContext context, 
final Map<String, String> stateMap) {
+        final String limit = context.getProperty(RESULT_LIMIT).getValue();
+        final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+        final HubSpotObjectType hubSpotObjectType = 
OBJECT_TYPE_LOOKUP_MAP.get(objectType);
+        final Long incrDelayMs = 
context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
 
-        final boolean isLimitSet = 
context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet();
-        if (isLimitSet) {
-            final String limit = context.getProperty(RESULT_LIMIT).getValue();
-            uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit);
+        final ObjectNode root = OBJECT_MAPPER.createObjectNode();
+        root.put("limit", limit);
+
+        final String cursor = stateMap.get(cursorKey);
+        if (cursor != null && !NO_PAGING.equals(cursor)) {
+            root.put(PAGING_CURSOR, stateMap.get(cursorKey));
         }
+        final boolean isIncremental = 
context.getProperty(IS_INCREMENTAL).asBoolean();
+        if (isIncremental) {
+            final String initialStartTimeValue = 
context.getProperty(INCREMENTAL_INITIAL_START_TIME).getValue();
+            String initialStartTimeEpoch = 
getInitialStartTimeEpoch(initialStartTimeValue);
+            final String hubspotSpecificIncrementalFieldName = 
hubSpotObjectType.getLastModifiedDateType().getValue();
+            final String lastStartTime = stateMap.get(START_INCREMENTAL_KEY);
+            final String lastEndTime = 
stateMap.getOrDefault(END_INCREMENTAL_KEY, initialStartTimeEpoch);

Review Comment:
   It is quite hard to understand why `lastEndTime` is initialized with 
`initialStartTimeEpoch`.
   It would be better to assign the End Time value form the state only (which 
can be null), and use the Initial Start Time when `currentStartTime` is 
calculated in line 348:
   ```
   currentStartTime = lastEndTime != null ? lastEndTime : 
getInitialStartTimeEpoch(initialStartTimeValue)`
   ```
   In this case the epoch is calculated only when needed (once).



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -99,7 +106,43 @@ public class GetHubSpot extends AbstractProcessor {
             .description("The maximum number of results to request for each 
invocation of the Processor")
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .required(false)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .addValidator(StandardValidators.createLongValidator(1, 100, true))
+            .build();
+
+    static final PropertyDescriptor IS_INCREMENTAL = new 
PropertyDescriptor.Builder()
+            .name("is-incremental")
+            .displayName("Incremental Loading")
+            .description("The processor can incrementally load the queried 
objects so that each object is queried exactly once." +
+                    " For each query, the processor queries objects within a 
time window where the objects were modified between" +
+                    " the previous run time and the current time (optionally 
adjusted by the Incremental Delay property).")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    static final PropertyDescriptor INCREMENTAL_DELAY = new 
PropertyDescriptor.Builder()
+            .name("incremental-delay")
+            .displayName("Incremental Delay")
+            .description(("The ending timestamp of the time window will be 
adjusted earlier by the amount configured in this property." +
+                    " For example, with a property value of 10 seconds, an 
ending timestamp of 12:30:45 would be changed to 12:30:35." +
+                    " Set this property to avoid missing objects when the 
clock of your local machines and HubSpot servers' clock are not in sync."))
+            .required(true)
+            .defaultValue("3 sec")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .dependsOn(IS_INCREMENTAL, "true")
+            .build();
+
+    static final PropertyDescriptor INCREMENTAL_INITIAL_START_TIME = new 
PropertyDescriptor.Builder()
+            .name("incremental-initial-start-time")
+            .displayName("Incremental Initial Start Time")
+            .description("This property specifies the start time as Epoch Time 
that the processor applies when running the first request." +

Review Comment:
   ```suggestion
               .description("This property specifies the start time that the 
processor applies when running the first request." +
   ```



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,78 +257,158 @@ private String getResponseBodyAsString(final 
ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, 
String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger 
objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext 
context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> 
stateMap) {
         return out -> {
             try (final JsonParser jsonParser = 
JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = 
JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = 
context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, 
objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new 
HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, 
final URI uri) {
-        return webClientServiceProvider.getWebClientService()
-                .get()
-                .uri(uri)
-                .header("Authorization", "Bearer " + accessToken)
-                .retrieve();
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, 
final URI uri, final String filters) {
+        final InputStream inputStream = IOUtils.toInputStream(filters, 
StandardCharsets.UTF_8);
+        try {
+            return webClientServiceProvider.getWebClientService()
+                    .post()
+                    .uri(uri)
+                    .header("Authorization", "Bearer " + accessToken)
+                    .header("Content-Type", "application/json")
+                    .body(inputStream, 
OptionalLong.of(inputStream.available()))
+                    .retrieve();
+        } catch (IOException e) {
+            throw new ProcessException("Could not transform incremental 
filters to input stream", e);
+        }
     }
 
-    private URI createUri(final ProcessContext context, final StateMap state) {
-        final String path = context.getProperty(OBJECT_TYPE).getValue();
-        final HttpUriBuilder uriBuilder = getBaseUri(context);
+    private String createIncrementalFilters(final ProcessContext context, 
final Map<String, String> stateMap) {
+        final String limit = context.getProperty(RESULT_LIMIT).getValue();
+        final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+        final HubSpotObjectType hubSpotObjectType = 
OBJECT_TYPE_LOOKUP_MAP.get(objectType);
+        final Long incrDelayMs = 
context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
 
-        final boolean isLimitSet = 
context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet();
-        if (isLimitSet) {
-            final String limit = context.getProperty(RESULT_LIMIT).getValue();
-            uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit);
+        final ObjectNode root = OBJECT_MAPPER.createObjectNode();
+        root.put("limit", limit);
+
+        final String cursor = stateMap.get(cursorKey);
+        if (cursor != null && !NO_PAGING.equals(cursor)) {
+            root.put(PAGING_CURSOR, stateMap.get(cursorKey));
         }

Review Comment:
   ```suggestion
           final String cursor = stateMap.get(cursorKey);
           if (cursor != null && !NO_PAGING.equals(cursor)) {
               root.put(PAGING_CURSOR, cursor);
           }
   ```



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -47,31 +48,37 @@
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.web.client.api.HttpResponseEntity;
 import org.apache.nifi.web.client.api.HttpResponseStatus;
-import org.apache.nifi.web.client.api.HttpUriBuilder;
 import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.UncheckedIOException;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.TemporalAccessor;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 @PrimaryNodeOnly
 @TriggerSerially
-@TriggerWhenEmpty
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @Tags({"hubspot"})
 @CapabilityDescription("Retrieves JSON data from a private HubSpot 
application."
-        + " Configuring the Result Limit property enables incremental 
retrieval of results. When this property is set the processor will"
-        + " retrieve new records. This processor is intended to be run on the 
Primary Node only.")
-@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is 
set, the paging cursor is saved after executing a request."
-        + " Only the objects after the paging cursor will be retrieved. The 
maximum number of retrieved objects is the 'Limit' attribute.")
+        + " This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "In case of incremental 
loading the processor run time is stored in the state." +
+        " When the 'Limit' attribute is set, the paging cursor is saved after 
executing a request. Only the objects after the paging" +
+        " cursor will be retrieved. The maximum number of retrieved objects 
can be set in the 'Limit' property.")

Review Comment:
   ```suggestion
   @Stateful(scopes = Scope.CLUSTER, description = "In case of incremental 
loading, the start and end timestamps of the last query time window are stored 
in the state." +
           " When the 'Result Limit' attribute is set, the paging cursor is 
saved after executing a request. Only the objects after the paging" +
           " cursor will be retrieved. The maximum number of retrieved objects 
can be set in the 'Result Limit' property.")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to