Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2199#discussion_r145696508
  
    --- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
    @@ -170,159 +210,213 @@ protected void init(final 
ProcessorInitializationContext context) {
             return this.descriptors;
         }
     
    +    final static Set<String> propertyNamesForActivatingClearState = new 
HashSet<String>();
    +    static {
    +        propertyNamesForActivatingClearState.add(SOLR_TYPE.getName());
    +        propertyNamesForActivatingClearState.add(SOLR_LOCATION.getName());
    +        propertyNamesForActivatingClearState.add(COLLECTION.getName());
    +        propertyNamesForActivatingClearState.add(SOLR_QUERY.getName());
    +        propertyNamesForActivatingClearState.add(DATE_FIELD.getName());
    +        propertyNamesForActivatingClearState.add(RETURN_FIELDS.getName());
    +    }
    +
         @Override
         public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
    -        lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE);
    +        if 
(propertyNamesForActivatingClearState.contains(descriptor.getName()))
    +            clearState.set(true);
         }
     
    -    @OnStopped
    -    public void onStopped() {
    -        writeLastEndDate();
    -    }
    +    @OnScheduled
    +    public void clearState(final ProcessContext context) throws 
IOException {
    +        if (clearState.getAndSet(false)) {
    +            context.getStateManager().clear(Scope.CLUSTER);
    +            final Map<String,String> newStateMap = new 
HashMap<String,String>();
     
    -    @OnRemoved
    -    public void onRemoved() {
    -        final File lastEndDateCache = new File(FILE_PREFIX + 
getIdentifier());
    -        if (lastEndDateCache.exists()) {
    -            lastEndDateCache.delete();
    -        }
    -    }
    +            newStateMap.put(STATE_MANAGER_CURSOR_MARK, "*");
     
    -    @Override
    -    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
    -        final ComponentLog logger = getLogger();
    -        readLastEndDate();
    -
    -        final SimpleDateFormat sdf = new 
SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US);
    -        sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
    -        final String currDate = sdf.format(new Date());
    -
    -        final boolean initialized = 
!UNINITIALIZED_LAST_END_DATE_VALUE.equals(lastEndDatedRef.get());
    -
    -        final String query = context.getProperty(SOLR_QUERY).getValue();
    -        final SolrQuery solrQuery = new SolrQuery(query);
    -        solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger());
    -
    -        // if initialized then apply a filter to restrict results from the 
last end time til now
    -        if (initialized) {
    -            StringBuilder filterQuery = new StringBuilder();
    -            filterQuery.append(context.getProperty(DATE_FIELD).getValue())
    -                    .append(":{").append(lastEndDatedRef.get()).append(" 
TO ")
    -                    .append(currDate).append("]");
    -            solrQuery.addFilterQuery(filterQuery.toString());
    -            logger.info("Applying filter query {}", new 
Object[]{filterQuery.toString()});
    -        }
    +            final String initialDate = 
context.getProperty(DATE_FILTER).getValue();
    +            if (StringUtils.isBlank(initialDate))
    +                newStateMap.put(STATE_MANAGER_FILTER, "*");
    +            else
    +                newStateMap.put(STATE_MANAGER_FILTER, initialDate);
     
    -        final String returnFields = 
context.getProperty(RETURN_FIELDS).getValue();
    -        if (returnFields != null && !returnFields.trim().isEmpty()) {
    -            for (String returnField : returnFields.trim().split("[,]")) {
    -                solrQuery.addField(returnField.trim());
    -            }
    +            context.getStateManager().setState(newStateMap, Scope.CLUSTER);
    +
    +            id_field = null;
             }
    +    }
     
    -        final String fullSortClause = 
context.getProperty(SORT_CLAUSE).getValue();
    -        if (fullSortClause != null && !fullSortClause.trim().isEmpty()) {
    -            for (String sortClause : fullSortClause.split("[,]")) {
    -                String[] sortParts = sortClause.trim().split("[ ]");
    -                solrQuery.addSort(sortParts[0], 
SolrQuery.ORDER.valueOf(sortParts[1]));
    -            }
    +    @Override
    +    protected final Collection<ValidationResult> 
additionalCustomValidation(ValidationContext context) {
    +        final Collection<ValidationResult> problems = new ArrayList<>();
    +
    +        if 
(context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue())
    +                && !context.getProperty(RECORD_WRITER).isSet()) {
    +            problems.add(new ValidationResult.Builder()
    +                    .explanation("for parsing records a record writer has 
to be configured")
    --- End diff --
    
    `for parsing records a ...` might be `for writing records a ...`? Parsing 
is done when reading record formatted data.


---

Reply via email to