Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145697961 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -170,159 +210,213 @@ protected void init(final ProcessorInitializationContext context) { return this.descriptors; } + final static Set<String> propertyNamesForActivatingClearState = new HashSet<String>(); + static { + propertyNamesForActivatingClearState.add(SOLR_TYPE.getName()); + propertyNamesForActivatingClearState.add(SOLR_LOCATION.getName()); + propertyNamesForActivatingClearState.add(COLLECTION.getName()); + propertyNamesForActivatingClearState.add(SOLR_QUERY.getName()); + propertyNamesForActivatingClearState.add(DATE_FIELD.getName()); + propertyNamesForActivatingClearState.add(RETURN_FIELDS.getName()); + } + @Override public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { - lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE); + if (propertyNamesForActivatingClearState.contains(descriptor.getName())) + clearState.set(true); } - @OnStopped - public void onStopped() { - writeLastEndDate(); - } + @OnScheduled + public void clearState(final ProcessContext context) throws IOException { + if (clearState.getAndSet(false)) { + context.getStateManager().clear(Scope.CLUSTER); + final Map<String,String> newStateMap = new HashMap<String,String>(); - @OnRemoved - public void onRemoved() { - final File lastEndDateCache = new File(FILE_PREFIX + getIdentifier()); - if (lastEndDateCache.exists()) { - lastEndDateCache.delete(); - } - } + newStateMap.put(STATE_MANAGER_CURSOR_MARK, "*"); - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - final ComponentLog logger = getLogger(); - readLastEndDate(); - - final SimpleDateFormat sdf = new SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US); - sdf.setTimeZone(TimeZone.getTimeZone("GMT")); - final String currDate = sdf.format(new Date()); - - final boolean initialized = !UNINITIALIZED_LAST_END_DATE_VALUE.equals(lastEndDatedRef.get()); - - final String query = context.getProperty(SOLR_QUERY).getValue(); - final SolrQuery solrQuery = new SolrQuery(query); - solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger()); - - // if initialized then apply a filter to restrict results from the last end time til now - if (initialized) { - StringBuilder filterQuery = new StringBuilder(); - filterQuery.append(context.getProperty(DATE_FIELD).getValue()) - .append(":{").append(lastEndDatedRef.get()).append(" TO ") - .append(currDate).append("]"); - solrQuery.addFilterQuery(filterQuery.toString()); - logger.info("Applying filter query {}", new Object[]{filterQuery.toString()}); - } + final String initialDate = context.getProperty(DATE_FILTER).getValue(); + if (StringUtils.isBlank(initialDate)) + newStateMap.put(STATE_MANAGER_FILTER, "*"); + else + newStateMap.put(STATE_MANAGER_FILTER, initialDate); - final String returnFields = context.getProperty(RETURN_FIELDS).getValue(); - if (returnFields != null && !returnFields.trim().isEmpty()) { - for (String returnField : returnFields.trim().split("[,]")) { - solrQuery.addField(returnField.trim()); - } + context.getStateManager().setState(newStateMap, Scope.CLUSTER); + + id_field = null; } + } - final String fullSortClause = context.getProperty(SORT_CLAUSE).getValue(); - if (fullSortClause != null && !fullSortClause.trim().isEmpty()) { - for (String sortClause : fullSortClause.split("[,]")) { - String[] sortParts = sortClause.trim().split("[ ]"); - solrQuery.addSort(sortParts[0], SolrQuery.ORDER.valueOf(sortParts[1])); - } + @Override + protected final Collection<ValidationResult> additionalCustomValidation(ValidationContext context) { + final Collection<ValidationResult> problems = new ArrayList<>(); + + if (context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue()) + && !context.getProperty(RECORD_WRITER).isSet()) { + problems.add(new ValidationResult.Builder() + .explanation("for parsing records a record writer has to be configured") + .valid(false) + .subject("Record writer check") + .build()); } + return problems; + } + private String getFieldNameOfUniqueKey() { + final SolrQuery solrQuery = new SolrQuery(); try { + solrQuery.setRequestHandler("/schema/uniquekey"); final QueryRequest req = new QueryRequest(solrQuery); if (isBasicAuthEnabled()) { req.setBasicAuthCredentials(getUsername(), getPassword()); } - // run the initial query and send out the first page of results - final StopWatch stopWatch = new StopWatch(true); - QueryResponse response = req.process(getSolrClient()); - stopWatch.stop(); - - long duration = stopWatch.getDuration(TimeUnit.MILLISECONDS); + return(req.process(getSolrClient()).getResponse().get("uniqueKey").toString()); + } catch (SolrServerException | IOException e) { + getLogger().error("Solr query to retrieve uniqueKey-field failed due to {}", new Object[]{solrQuery.toString(), e}, e); + throw new ProcessException(e); + } + } - final SolrDocumentList documentList = response.getResults(); - logger.info("Retrieved {} results from Solr for {} in {} ms", - new Object[] {documentList.getNumFound(), query, duration}); + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - if (documentList != null && documentList.getNumFound() > 0) { - FlowFile flowFile = session.create(); - flowFile = session.write(flowFile, new QueryResponseOutputStreamCallback(response)); - flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/xml"); - session.transfer(flowFile, REL_SUCCESS); + final ComponentLog logger = getLogger(); + final AtomicBoolean continuePaging = new AtomicBoolean(true); + final SolrQuery solrQuery = new SolrQuery(); - StringBuilder transitUri = new StringBuilder("solr://"); - transitUri.append(getSolrLocation()); - if (SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) { - transitUri.append("/").append(context.getProperty(COLLECTION).getValue()); - } + try { + if (id_field == null) { + id_field = getFieldNameOfUniqueKey(); + } - session.getProvenanceReporter().receive(flowFile, transitUri.toString(), duration); + final String dateField = context.getProperty(DATE_FIELD).getValue(); - // if initialized then page through the results and send out each page - if (initialized) { - int endRow = response.getResults().size(); - long totalResults = response.getResults().getNumFound(); + solrQuery.setQuery("*:*"); + final String query = context.getProperty(SOLR_QUERY).getValue(); + if (!StringUtils.isBlank(query) && !query.equals("*:*")) { + solrQuery.addFilterQuery(query); + } + final StringBuilder automatedFilterQuery = (new StringBuilder()) + .append(dateField) + .append(":[") + .append(context.getStateManager().getState(Scope.CLUSTER).get(STATE_MANAGER_FILTER)) + .append(" TO *]"); --- End diff -- When I rerun GetSolr processor after clearing its state, I've got following error, and probably we should take a situation when state doesn't contain STATE_MANAGER_FILTER here: ``` 2017-10-19 22:22:54,827 ERROR [Timer-Driven Process Thread-4] org.apache.nifi.processors.solr.GetSolr GetSolr[id=34a73c1d-015f-10 00-6121-dfeaf41fe595] GetSolr[id=34a73c1d-015f-1000-6121-dfeaf41fe595] failed to process due to org.apache.solr.client.solrj.impl .HttpSolrClient$RemoteSolrException: Error from server at http://192.168.99.1:8983/solr/techproducts: Invalid Date String:'null'; rolling back session: {} org.apache.solr.client.solrj.impl.HttpSolrClient$RemoteSolrException: Error from server at http://192.168.99.1:8983/solr/techprod ucts: Invalid Date String:'null' at org.apache.solr.client.solrj.impl.HttpSolrClient.executeMethod(HttpSolrClient.java:592) at org.apache.solr.client.solrj.impl.HttpSolrClient.request(HttpSolrClient.java:261) at org.apache.solr.client.solrj.impl.HttpSolrClient.request(HttpSolrClient.java:250) at org.apache.solr.client.solrj.impl.LBHttpSolrClient.doRequest(LBHttpSolrClient.java:403) at org.apache.solr.client.solrj.impl.LBHttpSolrClient.request(LBHttpSolrClient.java:355) at org.apache.solr.client.solrj.impl.CloudSolrClient.sendRequest(CloudSolrClient.java:1291) at org.apache.solr.client.solrj.impl.CloudSolrClient.requestWithRetryOnStaleState(CloudSolrClient.java:1061) at org.apache.solr.client.solrj.impl.CloudSolrClient.request(CloudSolrClient.java:997) at org.apache.solr.client.solrj.SolrRequest.process(SolrRequest.java:149) at org.apache.solr.client.solrj.SolrRequest.process(SolrRequest.java:166) at org.apache.nifi.processors.solr.GetSolr.onTrigger(GetSolr.java:336) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1119) at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147) at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ```
---