Lehel44 commented on code in PR #6379:
URL: https://github.com/apache/nifi/pull/6379#discussion_r978444386


##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,61 +243,122 @@ private String getResponseBodyAsString(final 
ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, 
String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger 
objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext 
context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> 
stateMap) {
         return out -> {
             try (final JsonParser jsonParser = 
JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = 
JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = 
context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, 
objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new 
HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, 
final URI uri) {
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, 
final URI uri, final String filters) {
+        final JsonInputStreamConverter converter = new 
JsonInputStreamConverter(filters);
         return webClientServiceProvider.getWebClientService()
-                .get()
+                .post()
                 .uri(uri)
                 .header("Authorization", "Bearer " + accessToken)
+                .header("Content-Type", "application/json")
+                .body(converter.getInputStream(), 
OptionalLong.of(converter.getByteSize()))
                 .retrieve();
     }
 
-    private URI createUri(final ProcessContext context, final StateMap state) {
-        final String path = context.getProperty(OBJECT_TYPE).getValue();
-        final HttpUriBuilder uriBuilder = getBaseUri(context);
+    String createIncrementalFilters(final ProcessContext context, final 
Map<String, String> stateMap) {
+        final String limit = context.getProperty(RESULT_LIMIT).getValue();
+        final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+        final HubSpotObjectType hubSpotObjectType = 
objectTypeLookupMap.get(objectType);
+        final Long incrDelayMs = 
context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String startIncrementalKey = String.format("start: %s", 
objectType);
+        final String endIncrementalKey = String.format("end: %s", objectType);

Review Comment:
   I think it's necessary because the user can query another object and then 
return to the previous object and overwrite the state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to