turcsanyip commented on code in PR #6379: URL: https://github.com/apache/nifi/pull/6379#discussion_r979460593
########## nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java: ########## @@ -187,78 +257,158 @@ private String getResponseBodyAsString(final ProcessContext context, final HttpR } } - private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) { + private OutputStreamCallback parseHttpResponse(final ProcessContext context, final HttpResponseEntity response, final AtomicInteger total, + final Map<String, String> stateMap) { return out -> { try (final JsonParser jsonParser = JSON_FACTORY.createParser(response.body()); final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) { + boolean isCursorAvailable = false; + final String objectType = context.getProperty(OBJECT_TYPE).getValue(); + final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType); Review Comment: Similar to the `time_window_*` state attributes, please remove the object type suffix here too. ########## nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java: ########## @@ -187,78 +257,158 @@ private String getResponseBodyAsString(final ProcessContext context, final HttpR } } - private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) { + private OutputStreamCallback parseHttpResponse(final ProcessContext context, final HttpResponseEntity response, final AtomicInteger total, + final Map<String, String> stateMap) { return out -> { try (final JsonParser jsonParser = JSON_FACTORY.createParser(response.body()); final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) { + boolean isCursorAvailable = false; + final String objectType = context.getProperty(OBJECT_TYPE).getValue(); + final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType); while (jsonParser.nextToken() != null) { + if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName() + .equals("total")) { + jsonParser.nextToken(); + total.set(jsonParser.getIntValue()); + } if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName() .equals("results")) { jsonParser.nextToken(); jsonGenerator.copyCurrentStructure(jsonParser); - objectCountHolder.incrementAndGet(); } final String fieldName = jsonParser.getCurrentName(); - if (CURSOR_PARAMETER.equals(fieldName)) { + if (PAGING_CURSOR.equals(fieldName)) { + isCursorAvailable = true; jsonParser.nextToken(); - Map<String, String> newStateMap = new HashMap<>(state.toMap()); - newStateMap.put(endpoint, jsonParser.getText()); - updateState(context, newStateMap); + stateMap.put(cursorKey, jsonParser.getText()); break; } } + if (!isCursorAvailable) { + stateMap.put(cursorKey, NO_PAGING); + } } }; } - HttpUriBuilder getBaseUri(final ProcessContext context) { + URI getBaseUri(final ProcessContext context) { final String path = context.getProperty(OBJECT_TYPE).getValue(); return webClientServiceProvider.getHttpUriBuilder() .scheme(HTTPS) .host(API_BASE_URI) - .encodedPath(path); + .encodedPath(path + "/search") + .build(); } - private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri) { - return webClientServiceProvider.getWebClientService() - .get() - .uri(uri) - .header("Authorization", "Bearer " + accessToken) - .retrieve(); + private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri, final String filters) { + final InputStream inputStream = IOUtils.toInputStream(filters, StandardCharsets.UTF_8); + try { + return webClientServiceProvider.getWebClientService() + .post() + .uri(uri) + .header("Authorization", "Bearer " + accessToken) + .header("Content-Type", "application/json") + .body(inputStream, OptionalLong.of(inputStream.available())) + .retrieve(); + } catch (IOException e) { + throw new ProcessException("Could not transform incremental filters to input stream", e); + } } - private URI createUri(final ProcessContext context, final StateMap state) { - final String path = context.getProperty(OBJECT_TYPE).getValue(); - final HttpUriBuilder uriBuilder = getBaseUri(context); + private String createIncrementalFilters(final ProcessContext context, final Map<String, String> stateMap) { + final String limit = context.getProperty(RESULT_LIMIT).getValue(); + final String objectType = context.getProperty(OBJECT_TYPE).getValue(); + final HubSpotObjectType hubSpotObjectType = OBJECT_TYPE_LOOKUP_MAP.get(objectType); + final Long incrDelayMs = context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS); + final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType); - final boolean isLimitSet = context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet(); - if (isLimitSet) { - final String limit = context.getProperty(RESULT_LIMIT).getValue(); - uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit); + final ObjectNode root = OBJECT_MAPPER.createObjectNode(); + root.put("limit", limit); + + final String cursor = stateMap.get(cursorKey); + if (cursor != null && !NO_PAGING.equals(cursor)) { + root.put(PAGING_CURSOR, stateMap.get(cursorKey)); } + final boolean isIncremental = context.getProperty(IS_INCREMENTAL).asBoolean(); + if (isIncremental) { + final String initialStartTimeValue = context.getProperty(INCREMENTAL_INITIAL_START_TIME).getValue(); + String initialStartTimeEpoch = getInitialStartTimeEpoch(initialStartTimeValue); + final String hubspotSpecificIncrementalFieldName = hubSpotObjectType.getLastModifiedDateType().getValue(); + final String lastStartTime = stateMap.get(START_INCREMENTAL_KEY); + final String lastEndTime = stateMap.getOrDefault(END_INCREMENTAL_KEY, initialStartTimeEpoch); + + final String currentStartTime; + final String currentEndTime; + + if (cursor != null && !NO_PAGING.equals(cursor)) { + currentStartTime = lastStartTime; + // lastEndTime can be null if incremental loading was turned off beforehand + currentEndTime = lastEndTime != null ? lastEndTime : String.valueOf(getCurrentEpochTime()); + } else { + currentStartTime = lastEndTime; + final long delayedCurrentEndTime = incrDelayMs != null ? getCurrentEpochTime() - incrDelayMs : getCurrentEpochTime(); + currentEndTime = String.valueOf(delayedCurrentEndTime); + + stateMap.put(START_INCREMENTAL_KEY, currentStartTime); + stateMap.put(END_INCREMENTAL_KEY, currentEndTime); + } + + final ArrayNode filters = OBJECT_MAPPER.createArrayNode(); - final String cursor = state.get(path); - if (cursor != null) { - uriBuilder.addQueryParameter(CURSOR_PARAMETER, cursor); + if (currentStartTime != null) { + final ObjectNode greaterThanFilterNode = OBJECT_MAPPER.createObjectNode(); + greaterThanFilterNode.put("propertyName", hubspotSpecificIncrementalFieldName); + greaterThanFilterNode.put("operator", "GTE"); + greaterThanFilterNode.put("value", currentStartTime); + filters.add(greaterThanFilterNode); + } + + final ObjectNode lessThanFilterNode = OBJECT_MAPPER.createObjectNode(); + lessThanFilterNode.put("propertyName", hubspotSpecificIncrementalFieldName); + lessThanFilterNode.put("operator", "LT"); + lessThanFilterNode.put("value", currentEndTime); + filters.add(lessThanFilterNode); + + root.set("filters", filters); } - return uriBuilder.build(); + return root.toString(); } - private StateMap getStateMap(final ProcessContext context) { + private String getInitialStartTimeEpoch(String initialStartTimeValue) { + if (initialStartTimeValue != null) { + final TemporalAccessor initialDateTime = DateTimeFormatter.ISO_DATE_TIME.parse(initialStartTimeValue); + return String.valueOf(Instant.from(initialDateTime).toEpochMilli()); + } + return null; + } + + long getCurrentEpochTime() { + return Instant.now().toEpochMilli(); + } + + private Map<String, String> getStateMap(final ProcessSession session) { final StateMap stateMap; try { - stateMap = context.getStateManager().getState(Scope.CLUSTER); + stateMap = session.getState(Scope.CLUSTER); } catch (IOException e) { throw new ProcessException("State retrieval failed", e); } - return stateMap; + return new HashMap<>(stateMap.toMap()); } - private void updateState(ProcessContext context, Map<String, String> newState) { + private void updateState(ProcessSession session, Map<String, String> newState) { try { - context.getStateManager().setState(newState, Scope.CLUSTER); + session.setState(newState, Scope.CLUSTER); } catch (IOException e) { throw new ProcessException("Page cursor update failed", e); } } + + private void clearState(ProcessSession session) { + try { + session.clearState(Scope.CLUSTER); Review Comment: Not sure why, but it does not remove previous items from the state. `ProcessContext.getStateManager().clear()` might be a better option for clearing. ########## nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java: ########## @@ -187,78 +257,158 @@ private String getResponseBodyAsString(final ProcessContext context, final HttpR } } - private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) { + private OutputStreamCallback parseHttpResponse(final ProcessContext context, final HttpResponseEntity response, final AtomicInteger total, + final Map<String, String> stateMap) { return out -> { try (final JsonParser jsonParser = JSON_FACTORY.createParser(response.body()); final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) { + boolean isCursorAvailable = false; + final String objectType = context.getProperty(OBJECT_TYPE).getValue(); + final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType); while (jsonParser.nextToken() != null) { + if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName() + .equals("total")) { + jsonParser.nextToken(); + total.set(jsonParser.getIntValue()); + } if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName() .equals("results")) { jsonParser.nextToken(); jsonGenerator.copyCurrentStructure(jsonParser); - objectCountHolder.incrementAndGet(); } final String fieldName = jsonParser.getCurrentName(); - if (CURSOR_PARAMETER.equals(fieldName)) { + if (PAGING_CURSOR.equals(fieldName)) { + isCursorAvailable = true; jsonParser.nextToken(); - Map<String, String> newStateMap = new HashMap<>(state.toMap()); - newStateMap.put(endpoint, jsonParser.getText()); - updateState(context, newStateMap); + stateMap.put(cursorKey, jsonParser.getText()); break; } } + if (!isCursorAvailable) { + stateMap.put(cursorKey, NO_PAGING); + } } }; } - HttpUriBuilder getBaseUri(final ProcessContext context) { + URI getBaseUri(final ProcessContext context) { final String path = context.getProperty(OBJECT_TYPE).getValue(); return webClientServiceProvider.getHttpUriBuilder() .scheme(HTTPS) .host(API_BASE_URI) - .encodedPath(path); + .encodedPath(path + "/search") + .build(); } - private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri) { - return webClientServiceProvider.getWebClientService() - .get() - .uri(uri) - .header("Authorization", "Bearer " + accessToken) - .retrieve(); + private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri, final String filters) { + final InputStream inputStream = IOUtils.toInputStream(filters, StandardCharsets.UTF_8); + try { + return webClientServiceProvider.getWebClientService() + .post() + .uri(uri) + .header("Authorization", "Bearer " + accessToken) + .header("Content-Type", "application/json") + .body(inputStream, OptionalLong.of(inputStream.available())) + .retrieve(); + } catch (IOException e) { + throw new ProcessException("Could not transform incremental filters to input stream", e); + } } - private URI createUri(final ProcessContext context, final StateMap state) { - final String path = context.getProperty(OBJECT_TYPE).getValue(); - final HttpUriBuilder uriBuilder = getBaseUri(context); + private String createIncrementalFilters(final ProcessContext context, final Map<String, String> stateMap) { + final String limit = context.getProperty(RESULT_LIMIT).getValue(); + final String objectType = context.getProperty(OBJECT_TYPE).getValue(); + final HubSpotObjectType hubSpotObjectType = OBJECT_TYPE_LOOKUP_MAP.get(objectType); + final Long incrDelayMs = context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS); + final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType); - final boolean isLimitSet = context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet(); - if (isLimitSet) { - final String limit = context.getProperty(RESULT_LIMIT).getValue(); - uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit); + final ObjectNode root = OBJECT_MAPPER.createObjectNode(); + root.put("limit", limit); + + final String cursor = stateMap.get(cursorKey); + if (cursor != null && !NO_PAGING.equals(cursor)) { + root.put(PAGING_CURSOR, stateMap.get(cursorKey)); } + final boolean isIncremental = context.getProperty(IS_INCREMENTAL).asBoolean(); + if (isIncremental) { + final String initialStartTimeValue = context.getProperty(INCREMENTAL_INITIAL_START_TIME).getValue(); + String initialStartTimeEpoch = getInitialStartTimeEpoch(initialStartTimeValue); + final String hubspotSpecificIncrementalFieldName = hubSpotObjectType.getLastModifiedDateType().getValue(); + final String lastStartTime = stateMap.get(START_INCREMENTAL_KEY); + final String lastEndTime = stateMap.getOrDefault(END_INCREMENTAL_KEY, initialStartTimeEpoch); + + final String currentStartTime; + final String currentEndTime; + + if (cursor != null && !NO_PAGING.equals(cursor)) { + currentStartTime = lastStartTime; + // lastEndTime can be null if incremental loading was turned off beforehand + currentEndTime = lastEndTime != null ? lastEndTime : String.valueOf(getCurrentEpochTime()); Review Comment: I think it is better to avoid this situation and the state should also be reset when the user changes `Incremental Loading` (similar to` Object Type`). In that case `currentEndTime = lastEndTime` would be enough because `lastEndTime` must have a value when the paging cursor is set. Otherwise `Incremental Delay` should be applied here too. ########## nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java: ########## @@ -99,7 +106,43 @@ public class GetHubSpot extends AbstractProcessor { .description("The maximum number of results to request for each invocation of the Processor") .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(false) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .addValidator(StandardValidators.createLongValidator(1, 100, true)) + .build(); + + static final PropertyDescriptor IS_INCREMENTAL = new PropertyDescriptor.Builder() + .name("is-incremental") + .displayName("Incremental Loading") + .description("The processor can incrementally load the queried objects so that each object is queried exactly once." + + " For each query, the processor queries objects within a time window where the objects were modified between" + + " the previous run time and the current time (optionally adjusted by the Incremental Delay property).") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + static final PropertyDescriptor INCREMENTAL_DELAY = new PropertyDescriptor.Builder() + .name("incremental-delay") + .displayName("Incremental Delay") + .description(("The ending timestamp of the time window will be adjusted earlier by the amount configured in this property." + + " For example, with a property value of 10 seconds, an ending timestamp of 12:30:45 would be changed to 12:30:35." + + " Set this property to avoid missing objects when the clock of your local machines and HubSpot servers' clock are not in sync.")) + .required(true) + .defaultValue("3 sec") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .dependsOn(IS_INCREMENTAL, "true") + .build(); + + static final PropertyDescriptor INCREMENTAL_INITIAL_START_TIME = new PropertyDescriptor.Builder() + .name("incremental-initial-start-time") + .displayName("Incremental Initial Start Time") + .description("This property specifies the start time as Epoch Time that the processor applies when running the first request." + + " The expected format is an ISO-like date-time with the offset and zone if available, such as '2011-12-03T10:15:30'," + + " '2011-12-03T10:15:30+01:00' or '2011-12-03T10:15:30+01:00[Europe/Paris]'") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR) Review Comment: It is not possible to enter datetime due to the `Long` validator. `ISO8601_INSTANT_VALIDATOR` could be used. It accepts only UTC time (e.g. `2011-12-03T10:15:30Z`), so the examples in the description need to be adjusted in that case. ########## nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java: ########## @@ -187,78 +257,158 @@ private String getResponseBodyAsString(final ProcessContext context, final HttpR } } - private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) { + private OutputStreamCallback parseHttpResponse(final ProcessContext context, final HttpResponseEntity response, final AtomicInteger total, + final Map<String, String> stateMap) { return out -> { try (final JsonParser jsonParser = JSON_FACTORY.createParser(response.body()); final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) { + boolean isCursorAvailable = false; + final String objectType = context.getProperty(OBJECT_TYPE).getValue(); + final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType); while (jsonParser.nextToken() != null) { + if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName() + .equals("total")) { + jsonParser.nextToken(); + total.set(jsonParser.getIntValue()); + } if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName() .equals("results")) { jsonParser.nextToken(); jsonGenerator.copyCurrentStructure(jsonParser); - objectCountHolder.incrementAndGet(); } final String fieldName = jsonParser.getCurrentName(); - if (CURSOR_PARAMETER.equals(fieldName)) { + if (PAGING_CURSOR.equals(fieldName)) { + isCursorAvailable = true; jsonParser.nextToken(); - Map<String, String> newStateMap = new HashMap<>(state.toMap()); - newStateMap.put(endpoint, jsonParser.getText()); - updateState(context, newStateMap); + stateMap.put(cursorKey, jsonParser.getText()); break; } } + if (!isCursorAvailable) { + stateMap.put(cursorKey, NO_PAGING); + } } }; } - HttpUriBuilder getBaseUri(final ProcessContext context) { + URI getBaseUri(final ProcessContext context) { final String path = context.getProperty(OBJECT_TYPE).getValue(); return webClientServiceProvider.getHttpUriBuilder() .scheme(HTTPS) .host(API_BASE_URI) - .encodedPath(path); + .encodedPath(path + "/search") + .build(); } - private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri) { - return webClientServiceProvider.getWebClientService() - .get() - .uri(uri) - .header("Authorization", "Bearer " + accessToken) - .retrieve(); + private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri, final String filters) { + final InputStream inputStream = IOUtils.toInputStream(filters, StandardCharsets.UTF_8); + try { + return webClientServiceProvider.getWebClientService() + .post() + .uri(uri) + .header("Authorization", "Bearer " + accessToken) + .header("Content-Type", "application/json") + .body(inputStream, OptionalLong.of(inputStream.available())) + .retrieve(); + } catch (IOException e) { + throw new ProcessException("Could not transform incremental filters to input stream", e); + } } - private URI createUri(final ProcessContext context, final StateMap state) { - final String path = context.getProperty(OBJECT_TYPE).getValue(); - final HttpUriBuilder uriBuilder = getBaseUri(context); + private String createIncrementalFilters(final ProcessContext context, final Map<String, String> stateMap) { + final String limit = context.getProperty(RESULT_LIMIT).getValue(); + final String objectType = context.getProperty(OBJECT_TYPE).getValue(); + final HubSpotObjectType hubSpotObjectType = OBJECT_TYPE_LOOKUP_MAP.get(objectType); + final Long incrDelayMs = context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS); + final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType); - final boolean isLimitSet = context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet(); - if (isLimitSet) { - final String limit = context.getProperty(RESULT_LIMIT).getValue(); - uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit); + final ObjectNode root = OBJECT_MAPPER.createObjectNode(); + root.put("limit", limit); + + final String cursor = stateMap.get(cursorKey); + if (cursor != null && !NO_PAGING.equals(cursor)) { + root.put(PAGING_CURSOR, stateMap.get(cursorKey)); } + final boolean isIncremental = context.getProperty(IS_INCREMENTAL).asBoolean(); + if (isIncremental) { + final String initialStartTimeValue = context.getProperty(INCREMENTAL_INITIAL_START_TIME).getValue(); + String initialStartTimeEpoch = getInitialStartTimeEpoch(initialStartTimeValue); + final String hubspotSpecificIncrementalFieldName = hubSpotObjectType.getLastModifiedDateType().getValue(); + final String lastStartTime = stateMap.get(START_INCREMENTAL_KEY); + final String lastEndTime = stateMap.getOrDefault(END_INCREMENTAL_KEY, initialStartTimeEpoch); Review Comment: It is quite hard to understand why `lastEndTime` is initialized with `initialStartTimeEpoch`. It would be better to assign the End Time value form the state only (which can be null), and use the Initial Start Time when `currentStartTime` is calculated in line 348: ``` currentStartTime = lastEndTime != null ? lastEndTime : getInitialStartTimeEpoch(initialStartTimeValue)` ``` In this case the epoch is calculated only when needed (once). ########## nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java: ########## @@ -99,7 +106,43 @@ public class GetHubSpot extends AbstractProcessor { .description("The maximum number of results to request for each invocation of the Processor") .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(false) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .addValidator(StandardValidators.createLongValidator(1, 100, true)) + .build(); + + static final PropertyDescriptor IS_INCREMENTAL = new PropertyDescriptor.Builder() + .name("is-incremental") + .displayName("Incremental Loading") + .description("The processor can incrementally load the queried objects so that each object is queried exactly once." + + " For each query, the processor queries objects within a time window where the objects were modified between" + + " the previous run time and the current time (optionally adjusted by the Incremental Delay property).") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + static final PropertyDescriptor INCREMENTAL_DELAY = new PropertyDescriptor.Builder() + .name("incremental-delay") + .displayName("Incremental Delay") + .description(("The ending timestamp of the time window will be adjusted earlier by the amount configured in this property." + + " For example, with a property value of 10 seconds, an ending timestamp of 12:30:45 would be changed to 12:30:35." + + " Set this property to avoid missing objects when the clock of your local machines and HubSpot servers' clock are not in sync.")) + .required(true) + .defaultValue("3 sec") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .dependsOn(IS_INCREMENTAL, "true") + .build(); + + static final PropertyDescriptor INCREMENTAL_INITIAL_START_TIME = new PropertyDescriptor.Builder() + .name("incremental-initial-start-time") + .displayName("Incremental Initial Start Time") + .description("This property specifies the start time as Epoch Time that the processor applies when running the first request." + Review Comment: ```suggestion .description("This property specifies the start time that the processor applies when running the first request." + ``` ########## nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java: ########## @@ -187,78 +257,158 @@ private String getResponseBodyAsString(final ProcessContext context, final HttpR } } - private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) { + private OutputStreamCallback parseHttpResponse(final ProcessContext context, final HttpResponseEntity response, final AtomicInteger total, + final Map<String, String> stateMap) { return out -> { try (final JsonParser jsonParser = JSON_FACTORY.createParser(response.body()); final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) { + boolean isCursorAvailable = false; + final String objectType = context.getProperty(OBJECT_TYPE).getValue(); + final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType); while (jsonParser.nextToken() != null) { + if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName() + .equals("total")) { + jsonParser.nextToken(); + total.set(jsonParser.getIntValue()); + } if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName() .equals("results")) { jsonParser.nextToken(); jsonGenerator.copyCurrentStructure(jsonParser); - objectCountHolder.incrementAndGet(); } final String fieldName = jsonParser.getCurrentName(); - if (CURSOR_PARAMETER.equals(fieldName)) { + if (PAGING_CURSOR.equals(fieldName)) { + isCursorAvailable = true; jsonParser.nextToken(); - Map<String, String> newStateMap = new HashMap<>(state.toMap()); - newStateMap.put(endpoint, jsonParser.getText()); - updateState(context, newStateMap); + stateMap.put(cursorKey, jsonParser.getText()); break; } } + if (!isCursorAvailable) { + stateMap.put(cursorKey, NO_PAGING); + } } }; } - HttpUriBuilder getBaseUri(final ProcessContext context) { + URI getBaseUri(final ProcessContext context) { final String path = context.getProperty(OBJECT_TYPE).getValue(); return webClientServiceProvider.getHttpUriBuilder() .scheme(HTTPS) .host(API_BASE_URI) - .encodedPath(path); + .encodedPath(path + "/search") + .build(); } - private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri) { - return webClientServiceProvider.getWebClientService() - .get() - .uri(uri) - .header("Authorization", "Bearer " + accessToken) - .retrieve(); + private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri, final String filters) { + final InputStream inputStream = IOUtils.toInputStream(filters, StandardCharsets.UTF_8); + try { + return webClientServiceProvider.getWebClientService() + .post() + .uri(uri) + .header("Authorization", "Bearer " + accessToken) + .header("Content-Type", "application/json") + .body(inputStream, OptionalLong.of(inputStream.available())) + .retrieve(); + } catch (IOException e) { + throw new ProcessException("Could not transform incremental filters to input stream", e); + } } - private URI createUri(final ProcessContext context, final StateMap state) { - final String path = context.getProperty(OBJECT_TYPE).getValue(); - final HttpUriBuilder uriBuilder = getBaseUri(context); + private String createIncrementalFilters(final ProcessContext context, final Map<String, String> stateMap) { + final String limit = context.getProperty(RESULT_LIMIT).getValue(); + final String objectType = context.getProperty(OBJECT_TYPE).getValue(); + final HubSpotObjectType hubSpotObjectType = OBJECT_TYPE_LOOKUP_MAP.get(objectType); + final Long incrDelayMs = context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS); + final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType); - final boolean isLimitSet = context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet(); - if (isLimitSet) { - final String limit = context.getProperty(RESULT_LIMIT).getValue(); - uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit); + final ObjectNode root = OBJECT_MAPPER.createObjectNode(); + root.put("limit", limit); + + final String cursor = stateMap.get(cursorKey); + if (cursor != null && !NO_PAGING.equals(cursor)) { + root.put(PAGING_CURSOR, stateMap.get(cursorKey)); } Review Comment: ```suggestion final String cursor = stateMap.get(cursorKey); if (cursor != null && !NO_PAGING.equals(cursor)) { root.put(PAGING_CURSOR, cursor); } ``` ########## nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java: ########## @@ -47,31 +48,37 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.web.client.api.HttpResponseEntity; import org.apache.nifi.web.client.api.HttpResponseStatus; -import org.apache.nifi.web.client.api.HttpUriBuilder; import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; import java.io.IOException; +import java.io.InputStream; import java.io.UncheckedIOException; import java.net.URI; import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.format.DateTimeFormatter; +import java.time.temporal.TemporalAccessor; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; @PrimaryNodeOnly @TriggerSerially -@TriggerWhenEmpty @InputRequirement(Requirement.INPUT_FORBIDDEN) @Tags({"hubspot"}) @CapabilityDescription("Retrieves JSON data from a private HubSpot application." - + " Configuring the Result Limit property enables incremental retrieval of results. When this property is set the processor will" - + " retrieve new records. This processor is intended to be run on the Primary Node only.") -@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is set, the paging cursor is saved after executing a request." - + " Only the objects after the paging cursor will be retrieved. The maximum number of retrieved objects is the 'Limit' attribute.") + + " This processor is intended to be run on the Primary Node only.") +@Stateful(scopes = Scope.CLUSTER, description = "In case of incremental loading the processor run time is stored in the state." + + " When the 'Limit' attribute is set, the paging cursor is saved after executing a request. Only the objects after the paging" + + " cursor will be retrieved. The maximum number of retrieved objects can be set in the 'Limit' property.") Review Comment: ```suggestion @Stateful(scopes = Scope.CLUSTER, description = "In case of incremental loading, the start and end timestamps of the last query time window are stored in the state." + " When the 'Result Limit' attribute is set, the paging cursor is saved after executing a request. Only the objects after the paging" + " cursor will be retrieved. The maximum number of retrieved objects can be set in the 'Result Limit' property.") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org