Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145696508 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -170,159 +210,213 @@ protected void init(final ProcessorInitializationContext context) { return this.descriptors; } + final static Set<String> propertyNamesForActivatingClearState = new HashSet<String>(); + static { + propertyNamesForActivatingClearState.add(SOLR_TYPE.getName()); + propertyNamesForActivatingClearState.add(SOLR_LOCATION.getName()); + propertyNamesForActivatingClearState.add(COLLECTION.getName()); + propertyNamesForActivatingClearState.add(SOLR_QUERY.getName()); + propertyNamesForActivatingClearState.add(DATE_FIELD.getName()); + propertyNamesForActivatingClearState.add(RETURN_FIELDS.getName()); + } + @Override public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { - lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE); + if (propertyNamesForActivatingClearState.contains(descriptor.getName())) + clearState.set(true); } - @OnStopped - public void onStopped() { - writeLastEndDate(); - } + @OnScheduled + public void clearState(final ProcessContext context) throws IOException { + if (clearState.getAndSet(false)) { + context.getStateManager().clear(Scope.CLUSTER); + final Map<String,String> newStateMap = new HashMap<String,String>(); - @OnRemoved - public void onRemoved() { - final File lastEndDateCache = new File(FILE_PREFIX + getIdentifier()); - if (lastEndDateCache.exists()) { - lastEndDateCache.delete(); - } - } + newStateMap.put(STATE_MANAGER_CURSOR_MARK, "*"); - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - final ComponentLog logger = getLogger(); - readLastEndDate(); - - final SimpleDateFormat sdf = new SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US); - sdf.setTimeZone(TimeZone.getTimeZone("GMT")); - final String currDate = sdf.format(new Date()); - - final boolean initialized = !UNINITIALIZED_LAST_END_DATE_VALUE.equals(lastEndDatedRef.get()); - - final String query = context.getProperty(SOLR_QUERY).getValue(); - final SolrQuery solrQuery = new SolrQuery(query); - solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger()); - - // if initialized then apply a filter to restrict results from the last end time til now - if (initialized) { - StringBuilder filterQuery = new StringBuilder(); - filterQuery.append(context.getProperty(DATE_FIELD).getValue()) - .append(":{").append(lastEndDatedRef.get()).append(" TO ") - .append(currDate).append("]"); - solrQuery.addFilterQuery(filterQuery.toString()); - logger.info("Applying filter query {}", new Object[]{filterQuery.toString()}); - } + final String initialDate = context.getProperty(DATE_FILTER).getValue(); + if (StringUtils.isBlank(initialDate)) + newStateMap.put(STATE_MANAGER_FILTER, "*"); + else + newStateMap.put(STATE_MANAGER_FILTER, initialDate); - final String returnFields = context.getProperty(RETURN_FIELDS).getValue(); - if (returnFields != null && !returnFields.trim().isEmpty()) { - for (String returnField : returnFields.trim().split("[,]")) { - solrQuery.addField(returnField.trim()); - } + context.getStateManager().setState(newStateMap, Scope.CLUSTER); + + id_field = null; } + } - final String fullSortClause = context.getProperty(SORT_CLAUSE).getValue(); - if (fullSortClause != null && !fullSortClause.trim().isEmpty()) { - for (String sortClause : fullSortClause.split("[,]")) { - String[] sortParts = sortClause.trim().split("[ ]"); - solrQuery.addSort(sortParts[0], SolrQuery.ORDER.valueOf(sortParts[1])); - } + @Override + protected final Collection<ValidationResult> additionalCustomValidation(ValidationContext context) { + final Collection<ValidationResult> problems = new ArrayList<>(); + + if (context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue()) + && !context.getProperty(RECORD_WRITER).isSet()) { + problems.add(new ValidationResult.Builder() + .explanation("for parsing records a record writer has to be configured") --- End diff -- `for parsing records a ...` might be `for writing records a ...`? Parsing is done when reading record formatted data.
---