turcsanyip commented on code in PR #6379:
URL: https://github.com/apache/nifi/pull/6379#discussion_r977962708


##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/HubSpotObjectType.java:
##########
@@ -18,97 +18,107 @@
 
 import org.apache.nifi.components.DescribedValue;
 
+import static 
org.apache.nifi.processors.hubspot.IncrementalFieldType.HS_LAST_MODIFIED_DATE;
+import static 
org.apache.nifi.processors.hubspot.IncrementalFieldType.LAST_MODIFIED_DATE;
+
 public enum HubSpotObjectType implements DescribedValue {
 
     COMPANIES(
             "/crm/v3/objects/companies",
             "Companies",
             "In HubSpot, the companies object is a standard CRM object. 
Individual company records can be used to store information about businesses" +
-                    " and organizations within company properties."
+                    " and organizations within company properties.",
+            HS_LAST_MODIFIED_DATE
     ),
     CONTACTS(
             "/crm/v3/objects/contacts",
             "Contacts",
             "In HubSpot, contacts store information about individuals. From 
marketing automation to smart content, the lead-specific data found in" +
-                    " contact records helps users leverage much of HubSpot's 
functionality."
+                    " contact records helps users leverage much of HubSpot's 
functionality.",
+            LAST_MODIFIED_DATE
     ),
     DEALS(
             "/crm/v3/objects/deals",
             "Deals",
             "In HubSpot, a deal represents an ongoing transaction that a sales 
team is pursuing with a contact or company. It’s tracked through" +
-                    " pipeline stages until won or lost."
+                    " pipeline stages until won or lost.",
+            HS_LAST_MODIFIED_DATE
     ),
     FEEDBACK_SUBMISSIONS(

Review Comment:
   Feedback Submissions API is currently in beta. I'd suggest removing it until 
it becomes GA.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,45 @@
+nifi-airtable-nar
+Copyright 2014-2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+**************************
+Apache Software License v2
+**************************
+
+  (ASLv2) Apache Commons Lang
+    The following NOTICE information applies:
+      Apache Commons Lang
+      Copyright 2001-2015 The Apache Software Foundation
+
+      This product includes software from the Spring Framework,
+      under the Apache License 2.0 (see: StringUtils.containsWhitespace())

Review Comment:
   I cannot see `Apache Commons Lang` dependency in the HubSpot bundle. If this 
is correct, please remove this entry.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -75,6 +82,10 @@
 @DefaultSettings(yieldDuration = "10 sec")
 public class GetHubSpot extends AbstractProcessor {
 
+    static final AllowableValue CREATE_DATE = new AllowableValue("createDate", 
"Create Date", "The time of the field was created");
+    static final AllowableValue LAST_MODIFIED_DATE = new 
AllowableValue("lastModifiedDate", "Last Modified Date",
+            "The time of the field was last modified");
+

Review Comment:
   Unused constants.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -99,7 +110,39 @@ public class GetHubSpot extends AbstractProcessor {
             .description("The maximum number of results to request for each 
invocation of the Processor")
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .required(false)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .addValidator(StandardValidators.createLongValidator(1, 100, true))
+            .build();
+
+    static final PropertyDescriptor IS_INCREMENTAL = new 
PropertyDescriptor.Builder()
+            .name("is-incremental")
+            .displayName("Incremental Loading")
+            .description("The processor can incrementally load the queried 
objects so that each object is queried exactly once." +
+                    " For each query, the processor queries objects which were 
created or modified after the previous run time" +
+                    " but before the current time.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    static final PropertyDescriptor INCREMENTAL_DELAY = new 
PropertyDescriptor.Builder()
+            .name("incremental-delay")
+            .displayName("Incremental Delay")
+            .description("The ending timestamp of the time window will be 
adjusted earlier by the amount configured in this property." +
+                    " For example, with a property value of 10 seconds, an 
ending timestamp of 12:30:45 would be changed to 12:30:35.")
+            .required(false)

Review Comment:
   I'd suggest setting it required with default value `3s` because some 
difference between local and server times is always expected.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -154,17 +207,20 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
         final String accessToken = 
context.getProperty(ACCESS_TOKEN).getValue();
         final String endpoint = context.getProperty(OBJECT_TYPE).getValue();
 
-        final StateMap state = getStateMap(context);
-        final URI uri = createUri(context, state);
+        final URI uri = getBaseUri(context);
 
-        final HttpResponseEntity response = getHttpResponseEntity(accessToken, 
uri);
-        final AtomicInteger objectCountHolder = new AtomicInteger();
+        final AtomicInteger total = new AtomicInteger(-1);
+        final StateMap state = getStateMap(context);
+        final Map<String, String> stateMap = new HashMap<>(state.toMap());
+        final String filters = createIncrementalFilters(context, stateMap);
+        final HttpResponseEntity response = getHttpResponseEntity(accessToken, 
uri, filters);
 
         if (response.statusCode() == HttpResponseStatus.OK.getCode()) {
             FlowFile flowFile = session.create();
-            flowFile = session.write(flowFile, parseHttpResponse(context, 
endpoint, state, response, objectCountHolder));
-            if (objectCountHolder.get() > 0) {
+            flowFile = session.write(flowFile, parseHttpResponse(context, 
response, total, stateMap));
+            if (total.get() > 0) {
                 session.transfer(flowFile, REL_SUCCESS);
+                updateState(context, stateMap);
             } else {
                 getLogger().debug("Empty response when requested HubSpot 
endpoint: [{}]", endpoint);
                 session.remove(flowFile);

Review Comment:
   The processor should yield in case of empty response in order not to hit the 
HubSpot API too frequently and unnecessarily when no new data is available.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,61 +243,122 @@ private String getResponseBodyAsString(final 
ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, 
String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger 
objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext 
context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> 
stateMap) {
         return out -> {
             try (final JsonParser jsonParser = 
JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = 
JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = 
context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, 
objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new 
HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, 
final URI uri) {
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, 
final URI uri, final String filters) {
+        final JsonInputStreamConverter converter = new 
JsonInputStreamConverter(filters);

Review Comment:
   `IOUtils.toInputStream(filters, StandardCharsets.UTF_8)` could be used with 
`available()` method for `contentLength`.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -154,17 +207,20 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
         final String accessToken = 
context.getProperty(ACCESS_TOKEN).getValue();
         final String endpoint = context.getProperty(OBJECT_TYPE).getValue();
 
-        final StateMap state = getStateMap(context);
-        final URI uri = createUri(context, state);
+        final URI uri = getBaseUri(context);
 
-        final HttpResponseEntity response = getHttpResponseEntity(accessToken, 
uri);
-        final AtomicInteger objectCountHolder = new AtomicInteger();
+        final AtomicInteger total = new AtomicInteger(-1);
+        final StateMap state = getStateMap(context);
+        final Map<String, String> stateMap = new HashMap<>(state.toMap());

Review Comment:
   The method could return `Map<String, String>` instead. `StateMap` is not 
used directly.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -117,18 +160,28 @@ public class GetHubSpot extends AbstractProcessor {
 
     private static final String API_BASE_URI = "api.hubapi.com";
     private static final String HTTPS = "https";
-    private static final String CURSOR_PARAMETER = "after";
-    private static final String LIMIT_PARAMETER = "limit";
     private static final int TOO_MANY_REQUESTS = 429;
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
+    private static final Map<String, HubSpotObjectType> objectTypeLookupMap = 
createObjectTypeLookupMap();
+    private static final String NO_PAGING = "no paging";
+    private static final String PAGING_CURSOR = "after";
+    private static final String CURSOR_KEY_PATTERN = "paging_next: %s";
+
+    private static Map<String, HubSpotObjectType> createObjectTypeLookupMap() {
+        return Arrays.stream(HubSpotObjectType.values())
+                .collect(Collectors.toMap(HubSpotObjectType::getValue, 
Function.identity()));
+    }
 
     private volatile WebClientServiceProvider webClientServiceProvider;
 
     private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
Collections.unmodifiableList(Arrays.asList(
             OBJECT_TYPE,

Review Comment:
   The processor should reset its state when the user changes the Object Type.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -99,7 +110,39 @@ public class GetHubSpot extends AbstractProcessor {
             .description("The maximum number of results to request for each 
invocation of the Processor")
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .required(false)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .addValidator(StandardValidators.createLongValidator(1, 100, true))
+            .build();
+
+    static final PropertyDescriptor IS_INCREMENTAL = new 
PropertyDescriptor.Builder()
+            .name("is-incremental")
+            .displayName("Incremental Loading")
+            .description("The processor can incrementally load the queried 
objects so that each object is queried exactly once." +
+                    " For each query, the processor queries objects which were 
created or modified after the previous run time" +
+                    " but before the current time.")

Review Comment:
   ```suggestion
                       " but before the current time (query time window).")
   ```



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/resources/META-INF/LICENSE:
##########
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+

Review Comment:
   LICENSE / NOTICE files need to be added in the NAR module, not in the 
processors module.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -117,18 +160,28 @@ public class GetHubSpot extends AbstractProcessor {
 
     private static final String API_BASE_URI = "api.hubapi.com";
     private static final String HTTPS = "https";
-    private static final String CURSOR_PARAMETER = "after";
-    private static final String LIMIT_PARAMETER = "limit";
     private static final int TOO_MANY_REQUESTS = 429;
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
+    private static final Map<String, HubSpotObjectType> objectTypeLookupMap = 
createObjectTypeLookupMap();

Review Comment:
   Please always use upper case for constants.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -75,6 +82,10 @@
 @DefaultSettings(yieldDuration = "10 sec")
 public class GetHubSpot extends AbstractProcessor {

Review Comment:
   Being a source processor, `@TriggerWhenEmpty` is not applicable here. Please 
remove it.
   
   Please also update `@Stateful`-s description with incremental loading state.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -99,7 +110,39 @@ public class GetHubSpot extends AbstractProcessor {
             .description("The maximum number of results to request for each 
invocation of the Processor")
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .required(false)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .addValidator(StandardValidators.createLongValidator(1, 100, true))
+            .build();
+
+    static final PropertyDescriptor IS_INCREMENTAL = new 
PropertyDescriptor.Builder()
+            .name("is-incremental")
+            .displayName("Incremental Loading")
+            .description("The processor can incrementally load the queried 
objects so that each object is queried exactly once." +
+                    " For each query, the processor queries objects which were 
created or modified after the previous run time" +
+                    " but before the current time.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    static final PropertyDescriptor INCREMENTAL_DELAY = new 
PropertyDescriptor.Builder()
+            .name("incremental-delay")
+            .displayName("Incremental Delay")
+            .description("The ending timestamp of the time window will be 
adjusted earlier by the amount configured in this property." +
+                    " For example, with a property value of 10 seconds, an 
ending timestamp of 12:30:45 would be changed to 12:30:35.")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .dependsOn(IS_INCREMENTAL, "true")
+            .build();
+
+    static final PropertyDescriptor INITIAL_INCREMENTAL_FILTER = new 
PropertyDescriptor.Builder()
+            .name("initial-incremental-filter")
+            .displayName("Initial Incremental Start Time")

Review Comment:
   It is a good practice to align the property name, display name and variable 
name.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/HubSpotObjectType.java:
##########
@@ -18,97 +18,107 @@
 
 import org.apache.nifi.components.DescribedValue;
 
+import static 
org.apache.nifi.processors.hubspot.IncrementalFieldType.HS_LAST_MODIFIED_DATE;
+import static 
org.apache.nifi.processors.hubspot.IncrementalFieldType.LAST_MODIFIED_DATE;
+
 public enum HubSpotObjectType implements DescribedValue {
 
     COMPANIES(
             "/crm/v3/objects/companies",
             "Companies",
             "In HubSpot, the companies object is a standard CRM object. 
Individual company records can be used to store information about businesses" +
-                    " and organizations within company properties."
+                    " and organizations within company properties.",
+            HS_LAST_MODIFIED_DATE

Review Comment:
   The Additional Details page is not in sync with this entity list. Please 
update the documentation.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -99,7 +110,39 @@ public class GetHubSpot extends AbstractProcessor {
             .description("The maximum number of results to request for each 
invocation of the Processor")
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .required(false)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .addValidator(StandardValidators.createLongValidator(1, 100, true))
+            .build();
+
+    static final PropertyDescriptor IS_INCREMENTAL = new 
PropertyDescriptor.Builder()
+            .name("is-incremental")
+            .displayName("Incremental Loading")
+            .description("The processor can incrementally load the queried 
objects so that each object is queried exactly once." +
+                    " For each query, the processor queries objects which were 
created or modified after the previous run time" +
+                    " but before the current time.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    static final PropertyDescriptor INCREMENTAL_DELAY = new 
PropertyDescriptor.Builder()
+            .name("incremental-delay")
+            .displayName("Incremental Delay")
+            .description("The ending timestamp of the time window will be 
adjusted earlier by the amount configured in this property." +
+                    " For example, with a property value of 10 seconds, an 
ending timestamp of 12:30:45 would be changed to 12:30:35.")

Review Comment:
   ```suggestion
               .description("The ending timestamp of the query time window will 
be adjusted earlier by the amount configured in this property." +
                       " For example, with a property value of 10 seconds, an 
ending timestamp of 12:30:45 would be changed to 12:30:35." +
                       " Set this property to avoid missing objects when the 
clock of your local machines and HubSpot servers' clock are not in sync.")
   ```



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -99,7 +110,39 @@ public class GetHubSpot extends AbstractProcessor {
             .description("The maximum number of results to request for each 
invocation of the Processor")
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .required(false)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .addValidator(StandardValidators.createLongValidator(1, 100, true))
+            .build();
+
+    static final PropertyDescriptor IS_INCREMENTAL = new 
PropertyDescriptor.Builder()
+            .name("is-incremental")
+            .displayName("Incremental Loading")
+            .description("The processor can incrementally load the queried 
objects so that each object is queried exactly once." +
+                    " For each query, the processor queries objects which were 
created or modified after the previous run time" +
+                    " but before the current time.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    static final PropertyDescriptor INCREMENTAL_DELAY = new 
PropertyDescriptor.Builder()
+            .name("incremental-delay")
+            .displayName("Incremental Delay")
+            .description("The ending timestamp of the time window will be 
adjusted earlier by the amount configured in this property." +
+                    " For example, with a property value of 10 seconds, an 
ending timestamp of 12:30:45 would be changed to 12:30:35.")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .dependsOn(IS_INCREMENTAL, "true")
+            .build();
+
+    static final PropertyDescriptor INITIAL_INCREMENTAL_FILTER = new 
PropertyDescriptor.Builder()
+            .name("initial-incremental-filter")
+            .displayName("Initial Incremental Start Time")
+            .description("This property specifies the start time as Epoch Time 
that the processor applies when running the first request.")

Review Comment:
   It would a better user experience to configure it in human readable format.
   Suggested format: 2022-09-22T21:50:18.000Z



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/IncrementalFieldType.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hubspot;
+
+public enum IncrementalFieldType {
+    LAST_MODIFIED_DATE("lastmodifieddate"),
+    HS_LAST_MODIFIED_DATE("hs_lastmodifieddate");
+
+    final String value;

Review Comment:
   ```suggestion
       private final String value;
   
   ```



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,61 +243,122 @@ private String getResponseBodyAsString(final 
ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, 
String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger 
objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext 
context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> 
stateMap) {
         return out -> {
             try (final JsonParser jsonParser = 
JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = 
JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = 
context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, 
objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new 
HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, 
final URI uri) {
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, 
final URI uri, final String filters) {
+        final JsonInputStreamConverter converter = new 
JsonInputStreamConverter(filters);
         return webClientServiceProvider.getWebClientService()
-                .get()
+                .post()
                 .uri(uri)
                 .header("Authorization", "Bearer " + accessToken)
+                .header("Content-Type", "application/json")
+                .body(converter.getInputStream(), 
OptionalLong.of(converter.getByteSize()))
                 .retrieve();
     }
 
-    private URI createUri(final ProcessContext context, final StateMap state) {
-        final String path = context.getProperty(OBJECT_TYPE).getValue();
-        final HttpUriBuilder uriBuilder = getBaseUri(context);
+    String createIncrementalFilters(final ProcessContext context, final 
Map<String, String> stateMap) {
+        final String limit = context.getProperty(RESULT_LIMIT).getValue();
+        final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+        final HubSpotObjectType hubSpotObjectType = 
objectTypeLookupMap.get(objectType);
+        final Long incrDelayMs = 
context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String startIncrementalKey = String.format("start: %s", 
objectType);
+        final String endIncrementalKey = String.format("end: %s", objectType);
+        final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
+
+        final ObjectNode root = OBJECT_MAPPER.createObjectNode();
+        root.put("limit", limit);
 
-        final boolean isLimitSet = 
context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet();
-        if (isLimitSet) {
-            final String limit = context.getProperty(RESULT_LIMIT).getValue();
-            uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit);
+        final String cursor = stateMap.get(cursorKey);
+        if (cursor != null && !NO_PAGING.equals(cursor)) {
+            root.put(PAGING_CURSOR, stateMap.get(cursorKey));
         }
+        final boolean isIncremental = 
context.getProperty(IS_INCREMENTAL).asBoolean();
+        if (isIncremental) {
+
+            final String hubspotSpecificIncrementalFieldName = 
hubSpotObjectType.getLastModifiedDateType().getValue();
+            final String lastStartTime = stateMap.get(startIncrementalKey);
+            final String lastEndTime = 
stateMap.getOrDefault(endIncrementalKey, 
context.getProperty(INITIAL_INCREMENTAL_FILTER).getValue());
+
+            String currentStartTime;
+            String currentEndTime;
+
+            if (cursor != null && !NO_PAGING.equals(cursor)) {
+                currentStartTime = lastStartTime;
+                // lastEndTime can be null if incremental loading was turned 
off beforehand
+                currentEndTime = lastEndTime != null ? lastEndTime : 
String.valueOf(getCurrentEpochTime());
+            } else {
+                currentStartTime = lastEndTime;
+                final long delayedCurrentEndTime = incrDelayMs != null ? 
getCurrentEpochTime() - incrDelayMs : getCurrentEpochTime();
+                currentEndTime = String.valueOf(delayedCurrentEndTime);
 
-        final String cursor = state.get(path);
-        if (cursor != null) {
-            uriBuilder.addQueryParameter(CURSOR_PARAMETER, cursor);
+                stateMap.put(startIncrementalKey, currentStartTime);
+                stateMap.put(endIncrementalKey, currentEndTime);
+            }
+
+            final ArrayNode filters = OBJECT_MAPPER.createArrayNode();
+
+            if (currentStartTime != null) {
+                final ObjectNode greaterThanFilterNode = 
OBJECT_MAPPER.createObjectNode();
+                greaterThanFilterNode.put("propertyName", 
hubspotSpecificIncrementalFieldName);
+                greaterThanFilterNode.put("operator", "GT");
+                greaterThanFilterNode.put("value", currentStartTime);
+                filters.add(greaterThanFilterNode);
+            }
+
+            final ObjectNode lessThanFilterNode = 
OBJECT_MAPPER.createObjectNode();
+            lessThanFilterNode.put("propertyName", 
hubspotSpecificIncrementalFieldName);
+            lessThanFilterNode.put("operator", "LT");
+            lessThanFilterNode.put("value", currentEndTime);
+            filters.add(lessThanFilterNode);
+
+            root.set("filters", filters);
         }
-        return uriBuilder.build();
+        return root.toString();
+    }
+
+    long getCurrentEpochTime() {
+        return Instant.now().toEpochMilli();
     }
 
     private StateMap getStateMap(final ProcessContext context) {

Review Comment:
   Please use `ProcessSession.getState()` and `setState()` instead of 
`ProcessContext.getStateManager().getState()` and `setState()`because the 
former is transactional and gets only committed if the session commits 
successfully (along with the FFs sent out).



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,61 +243,122 @@ private String getResponseBodyAsString(final 
ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, 
String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger 
objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext 
context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> 
stateMap) {
         return out -> {
             try (final JsonParser jsonParser = 
JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = 
JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = 
context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, 
objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new 
HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, 
final URI uri) {
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, 
final URI uri, final String filters) {
+        final JsonInputStreamConverter converter = new 
JsonInputStreamConverter(filters);
         return webClientServiceProvider.getWebClientService()
-                .get()
+                .post()
                 .uri(uri)
                 .header("Authorization", "Bearer " + accessToken)
+                .header("Content-Type", "application/json")
+                .body(converter.getInputStream(), 
OptionalLong.of(converter.getByteSize()))
                 .retrieve();
     }
 
-    private URI createUri(final ProcessContext context, final StateMap state) {
-        final String path = context.getProperty(OBJECT_TYPE).getValue();
-        final HttpUriBuilder uriBuilder = getBaseUri(context);
+    String createIncrementalFilters(final ProcessContext context, final 
Map<String, String> stateMap) {
+        final String limit = context.getProperty(RESULT_LIMIT).getValue();
+        final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+        final HubSpotObjectType hubSpotObjectType = 
objectTypeLookupMap.get(objectType);
+        final Long incrDelayMs = 
context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String startIncrementalKey = String.format("start: %s", 
objectType);
+        final String endIncrementalKey = String.format("end: %s", objectType);
+        final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
+
+        final ObjectNode root = OBJECT_MAPPER.createObjectNode();
+        root.put("limit", limit);
 
-        final boolean isLimitSet = 
context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet();
-        if (isLimitSet) {
-            final String limit = context.getProperty(RESULT_LIMIT).getValue();
-            uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit);
+        final String cursor = stateMap.get(cursorKey);
+        if (cursor != null && !NO_PAGING.equals(cursor)) {
+            root.put(PAGING_CURSOR, stateMap.get(cursorKey));
         }
+        final boolean isIncremental = 
context.getProperty(IS_INCREMENTAL).asBoolean();
+        if (isIncremental) {
+
+            final String hubspotSpecificIncrementalFieldName = 
hubSpotObjectType.getLastModifiedDateType().getValue();
+            final String lastStartTime = stateMap.get(startIncrementalKey);
+            final String lastEndTime = 
stateMap.getOrDefault(endIncrementalKey, 
context.getProperty(INITIAL_INCREMENTAL_FILTER).getValue());
+
+            String currentStartTime;
+            String currentEndTime;
+
+            if (cursor != null && !NO_PAGING.equals(cursor)) {
+                currentStartTime = lastStartTime;
+                // lastEndTime can be null if incremental loading was turned 
off beforehand
+                currentEndTime = lastEndTime != null ? lastEndTime : 
String.valueOf(getCurrentEpochTime());
+            } else {
+                currentStartTime = lastEndTime;
+                final long delayedCurrentEndTime = incrDelayMs != null ? 
getCurrentEpochTime() - incrDelayMs : getCurrentEpochTime();
+                currentEndTime = String.valueOf(delayedCurrentEndTime);
 
-        final String cursor = state.get(path);
-        if (cursor != null) {
-            uriBuilder.addQueryParameter(CURSOR_PARAMETER, cursor);
+                stateMap.put(startIncrementalKey, currentStartTime);
+                stateMap.put(endIncrementalKey, currentEndTime);
+            }
+
+            final ArrayNode filters = OBJECT_MAPPER.createArrayNode();
+
+            if (currentStartTime != null) {
+                final ObjectNode greaterThanFilterNode = 
OBJECT_MAPPER.createObjectNode();
+                greaterThanFilterNode.put("propertyName", 
hubspotSpecificIncrementalFieldName);
+                greaterThanFilterNode.put("operator", "GT");

Review Comment:
   One end of the time window must be inclusive in order not to loose objects 
with timestamp matching the window boundaries.
   ```suggestion
                   greaterThanFilterNode.put("operator", "GTE");
   ```



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -154,17 +207,20 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
         final String accessToken = 
context.getProperty(ACCESS_TOKEN).getValue();
         final String endpoint = context.getProperty(OBJECT_TYPE).getValue();
 
-        final StateMap state = getStateMap(context);
-        final URI uri = createUri(context, state);
+        final URI uri = getBaseUri(context);
 
-        final HttpResponseEntity response = getHttpResponseEntity(accessToken, 
uri);
-        final AtomicInteger objectCountHolder = new AtomicInteger();
+        final AtomicInteger total = new AtomicInteger(-1);
+        final StateMap state = getStateMap(context);
+        final Map<String, String> stateMap = new HashMap<>(state.toMap());
+        final String filters = createIncrementalFilters(context, stateMap);
+        final HttpResponseEntity response = getHttpResponseEntity(accessToken, 
uri, filters);
 
         if (response.statusCode() == HttpResponseStatus.OK.getCode()) {
             FlowFile flowFile = session.create();
-            flowFile = session.write(flowFile, parseHttpResponse(context, 
endpoint, state, response, objectCountHolder));
-            if (objectCountHolder.get() > 0) {
+            flowFile = session.write(flowFile, parseHttpResponse(context, 
response, total, stateMap));
+            if (total.get() > 0) {
                 session.transfer(flowFile, REL_SUCCESS);
+                updateState(context, stateMap);

Review Comment:
   I would update the state with the new start/end times even when no new 
objects have been found in this round in order to make the progress visible in 
the state variables.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,61 +243,122 @@ private String getResponseBodyAsString(final 
ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, 
String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger 
objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext 
context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> 
stateMap) {
         return out -> {
             try (final JsonParser jsonParser = 
JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = 
JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = 
context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, 
objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new 
HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, 
final URI uri) {
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, 
final URI uri, final String filters) {
+        final JsonInputStreamConverter converter = new 
JsonInputStreamConverter(filters);
         return webClientServiceProvider.getWebClientService()
-                .get()
+                .post()
                 .uri(uri)
                 .header("Authorization", "Bearer " + accessToken)
+                .header("Content-Type", "application/json")
+                .body(converter.getInputStream(), 
OptionalLong.of(converter.getByteSize()))
                 .retrieve();
     }
 
-    private URI createUri(final ProcessContext context, final StateMap state) {
-        final String path = context.getProperty(OBJECT_TYPE).getValue();
-        final HttpUriBuilder uriBuilder = getBaseUri(context);
+    String createIncrementalFilters(final ProcessContext context, final 
Map<String, String> stateMap) {
+        final String limit = context.getProperty(RESULT_LIMIT).getValue();
+        final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+        final HubSpotObjectType hubSpotObjectType = 
objectTypeLookupMap.get(objectType);
+        final Long incrDelayMs = 
context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String startIncrementalKey = String.format("start: %s", 
objectType);
+        final String endIncrementalKey = String.format("end: %s", objectType);
+        final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
+
+        final ObjectNode root = OBJECT_MAPPER.createObjectNode();
+        root.put("limit", limit);
 
-        final boolean isLimitSet = 
context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet();
-        if (isLimitSet) {
-            final String limit = context.getProperty(RESULT_LIMIT).getValue();
-            uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit);
+        final String cursor = stateMap.get(cursorKey);
+        if (cursor != null && !NO_PAGING.equals(cursor)) {
+            root.put(PAGING_CURSOR, stateMap.get(cursorKey));
         }
+        final boolean isIncremental = 
context.getProperty(IS_INCREMENTAL).asBoolean();
+        if (isIncremental) {
+
+            final String hubspotSpecificIncrementalFieldName = 
hubSpotObjectType.getLastModifiedDateType().getValue();
+            final String lastStartTime = stateMap.get(startIncrementalKey);
+            final String lastEndTime = 
stateMap.getOrDefault(endIncrementalKey, 
context.getProperty(INITIAL_INCREMENTAL_FILTER).getValue());
+
+            String currentStartTime;
+            String currentEndTime;

Review Comment:
   These cloud be `final` too.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,61 +243,122 @@ private String getResponseBodyAsString(final 
ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, 
String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger 
objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext 
context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> 
stateMap) {
         return out -> {
             try (final JsonParser jsonParser = 
JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = 
JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = 
context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, 
objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new 
HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, 
final URI uri) {
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, 
final URI uri, final String filters) {
+        final JsonInputStreamConverter converter = new 
JsonInputStreamConverter(filters);
         return webClientServiceProvider.getWebClientService()
-                .get()
+                .post()
                 .uri(uri)
                 .header("Authorization", "Bearer " + accessToken)
+                .header("Content-Type", "application/json")
+                .body(converter.getInputStream(), 
OptionalLong.of(converter.getByteSize()))
                 .retrieve();
     }
 
-    private URI createUri(final ProcessContext context, final StateMap state) {
-        final String path = context.getProperty(OBJECT_TYPE).getValue();
-        final HttpUriBuilder uriBuilder = getBaseUri(context);
+    String createIncrementalFilters(final ProcessContext context, final 
Map<String, String> stateMap) {

Review Comment:
   ```suggestion
       private String createIncrementalFilters(final ProcessContext context, 
final Map<String, String> stateMap) {
   ```



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,61 +243,122 @@ private String getResponseBodyAsString(final 
ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, 
String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger 
objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext 
context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> 
stateMap) {
         return out -> {
             try (final JsonParser jsonParser = 
JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = 
JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = 
context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, 
objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new 
HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, 
final URI uri) {
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, 
final URI uri, final String filters) {
+        final JsonInputStreamConverter converter = new 
JsonInputStreamConverter(filters);
         return webClientServiceProvider.getWebClientService()
-                .get()
+                .post()
                 .uri(uri)
                 .header("Authorization", "Bearer " + accessToken)
+                .header("Content-Type", "application/json")
+                .body(converter.getInputStream(), 
OptionalLong.of(converter.getByteSize()))
                 .retrieve();
     }
 
-    private URI createUri(final ProcessContext context, final StateMap state) {
-        final String path = context.getProperty(OBJECT_TYPE).getValue();
-        final HttpUriBuilder uriBuilder = getBaseUri(context);
+    String createIncrementalFilters(final ProcessContext context, final 
Map<String, String> stateMap) {
+        final String limit = context.getProperty(RESULT_LIMIT).getValue();
+        final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+        final HubSpotObjectType hubSpotObjectType = 
objectTypeLookupMap.get(objectType);
+        final Long incrDelayMs = 
context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String startIncrementalKey = String.format("start: %s", 
objectType);
+        final String endIncrementalKey = String.format("end: %s", objectType);

Review Comment:
   Is it necessary to add the `objectType` in the state? The processor handles 
only one object type at a time.
   
   I'd suggest using more descriptive names:
   - query_time_window_start
   - query_time_window_end



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/IncrementalFieldType.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hubspot;
+
+public enum IncrementalFieldType {
+    LAST_MODIFIED_DATE("lastmodifieddate"),
+    HS_LAST_MODIFIED_DATE("hs_lastmodifieddate");
+
+    final String value;
+
+    IncrementalFieldType(String value) {
+        this.value = value;
+    }
+
+    String getValue() {
+        return value;
+    }

Review Comment:
   ```suggestion
       public String getValue() {
           return value;
       }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to