Lehel44 commented on code in PR #6379:
URL: https://github.com/apache/nifi/pull/6379#discussion_r979481249


##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,78 +257,158 @@ private String getResponseBodyAsString(final 
ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, 
String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger 
objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext 
context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> 
stateMap) {
         return out -> {
             try (final JsonParser jsonParser = 
JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = 
JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = 
context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, 
objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME 
&& jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new 
HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, 
final URI uri) {
-        return webClientServiceProvider.getWebClientService()
-                .get()
-                .uri(uri)
-                .header("Authorization", "Bearer " + accessToken)
-                .retrieve();
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, 
final URI uri, final String filters) {
+        final InputStream inputStream = IOUtils.toInputStream(filters, 
StandardCharsets.UTF_8);
+        try {
+            return webClientServiceProvider.getWebClientService()
+                    .post()
+                    .uri(uri)
+                    .header("Authorization", "Bearer " + accessToken)
+                    .header("Content-Type", "application/json")
+                    .body(inputStream, 
OptionalLong.of(inputStream.available()))
+                    .retrieve();
+        } catch (IOException e) {
+            throw new ProcessException("Could not transform incremental 
filters to input stream", e);
+        }
     }
 
-    private URI createUri(final ProcessContext context, final StateMap state) {
-        final String path = context.getProperty(OBJECT_TYPE).getValue();
-        final HttpUriBuilder uriBuilder = getBaseUri(context);
+    private String createIncrementalFilters(final ProcessContext context, 
final Map<String, String> stateMap) {
+        final String limit = context.getProperty(RESULT_LIMIT).getValue();
+        final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+        final HubSpotObjectType hubSpotObjectType = 
OBJECT_TYPE_LOOKUP_MAP.get(objectType);
+        final Long incrDelayMs = 
context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
 
-        final boolean isLimitSet = 
context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet();
-        if (isLimitSet) {
-            final String limit = context.getProperty(RESULT_LIMIT).getValue();
-            uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit);
+        final ObjectNode root = OBJECT_MAPPER.createObjectNode();
+        root.put("limit", limit);
+
+        final String cursor = stateMap.get(cursorKey);
+        if (cursor != null && !NO_PAGING.equals(cursor)) {
+            root.put(PAGING_CURSOR, stateMap.get(cursorKey));
         }
+        final boolean isIncremental = 
context.getProperty(IS_INCREMENTAL).asBoolean();
+        if (isIncremental) {
+            final String initialStartTimeValue = 
context.getProperty(INCREMENTAL_INITIAL_START_TIME).getValue();
+            String initialStartTimeEpoch = 
getInitialStartTimeEpoch(initialStartTimeValue);
+            final String hubspotSpecificIncrementalFieldName = 
hubSpotObjectType.getLastModifiedDateType().getValue();
+            final String lastStartTime = stateMap.get(START_INCREMENTAL_KEY);
+            final String lastEndTime = 
stateMap.getOrDefault(END_INCREMENTAL_KEY, initialStartTimeEpoch);

Review Comment:
   Very good idea, thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to