[ https://issues.apache.org/jira/browse/NIFI-3248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16211171#comment-16211171 ]
ASF GitHub Bot commented on NIFI-3248: -------------------------------------- Github user JohannesDaniel commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145727845 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -170,159 +210,213 @@ protected void init(final ProcessorInitializationContext context) { return this.descriptors; } + final static Set<String> propertyNamesForActivatingClearState = new HashSet<String>(); + static { + propertyNamesForActivatingClearState.add(SOLR_TYPE.getName()); + propertyNamesForActivatingClearState.add(SOLR_LOCATION.getName()); + propertyNamesForActivatingClearState.add(COLLECTION.getName()); + propertyNamesForActivatingClearState.add(SOLR_QUERY.getName()); + propertyNamesForActivatingClearState.add(DATE_FIELD.getName()); + propertyNamesForActivatingClearState.add(RETURN_FIELDS.getName()); + } + @Override public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { - lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE); + if (propertyNamesForActivatingClearState.contains(descriptor.getName())) + clearState.set(true); } - @OnStopped - public void onStopped() { - writeLastEndDate(); - } + @OnScheduled + public void clearState(final ProcessContext context) throws IOException { + if (clearState.getAndSet(false)) { + context.getStateManager().clear(Scope.CLUSTER); + final Map<String,String> newStateMap = new HashMap<String,String>(); - @OnRemoved - public void onRemoved() { - final File lastEndDateCache = new File(FILE_PREFIX + getIdentifier()); - if (lastEndDateCache.exists()) { - lastEndDateCache.delete(); - } - } + newStateMap.put(STATE_MANAGER_CURSOR_MARK, "*"); - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - final ComponentLog logger = getLogger(); - readLastEndDate(); - - final SimpleDateFormat sdf = new SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US); - sdf.setTimeZone(TimeZone.getTimeZone("GMT")); - final String currDate = sdf.format(new Date()); - - final boolean initialized = !UNINITIALIZED_LAST_END_DATE_VALUE.equals(lastEndDatedRef.get()); - - final String query = context.getProperty(SOLR_QUERY).getValue(); - final SolrQuery solrQuery = new SolrQuery(query); - solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger()); - - // if initialized then apply a filter to restrict results from the last end time til now - if (initialized) { - StringBuilder filterQuery = new StringBuilder(); - filterQuery.append(context.getProperty(DATE_FIELD).getValue()) - .append(":{").append(lastEndDatedRef.get()).append(" TO ") - .append(currDate).append("]"); - solrQuery.addFilterQuery(filterQuery.toString()); - logger.info("Applying filter query {}", new Object[]{filterQuery.toString()}); - } + final String initialDate = context.getProperty(DATE_FILTER).getValue(); + if (StringUtils.isBlank(initialDate)) + newStateMap.put(STATE_MANAGER_FILTER, "*"); + else + newStateMap.put(STATE_MANAGER_FILTER, initialDate); - final String returnFields = context.getProperty(RETURN_FIELDS).getValue(); - if (returnFields != null && !returnFields.trim().isEmpty()) { - for (String returnField : returnFields.trim().split("[,]")) { - solrQuery.addField(returnField.trim()); - } + context.getStateManager().setState(newStateMap, Scope.CLUSTER); + + id_field = null; } + } - final String fullSortClause = context.getProperty(SORT_CLAUSE).getValue(); - if (fullSortClause != null && !fullSortClause.trim().isEmpty()) { - for (String sortClause : fullSortClause.split("[,]")) { - String[] sortParts = sortClause.trim().split("[ ]"); - solrQuery.addSort(sortParts[0], SolrQuery.ORDER.valueOf(sortParts[1])); - } + @Override + protected final Collection<ValidationResult> additionalCustomValidation(ValidationContext context) { + final Collection<ValidationResult> problems = new ArrayList<>(); + + if (context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue()) + && !context.getProperty(RECORD_WRITER).isSet()) { + problems.add(new ValidationResult.Builder() + .explanation("for parsing records a record writer has to be configured") + .valid(false) + .subject("Record writer check") + .build()); } + return problems; + } + private String getFieldNameOfUniqueKey() { + final SolrQuery solrQuery = new SolrQuery(); try { + solrQuery.setRequestHandler("/schema/uniquekey"); final QueryRequest req = new QueryRequest(solrQuery); if (isBasicAuthEnabled()) { req.setBasicAuthCredentials(getUsername(), getPassword()); } - // run the initial query and send out the first page of results - final StopWatch stopWatch = new StopWatch(true); - QueryResponse response = req.process(getSolrClient()); - stopWatch.stop(); - - long duration = stopWatch.getDuration(TimeUnit.MILLISECONDS); + return(req.process(getSolrClient()).getResponse().get("uniqueKey").toString()); + } catch (SolrServerException | IOException e) { + getLogger().error("Solr query to retrieve uniqueKey-field failed due to {}", new Object[]{solrQuery.toString(), e}, e); + throw new ProcessException(e); + } + } - final SolrDocumentList documentList = response.getResults(); - logger.info("Retrieved {} results from Solr for {} in {} ms", - new Object[] {documentList.getNumFound(), query, duration}); + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - if (documentList != null && documentList.getNumFound() > 0) { - FlowFile flowFile = session.create(); - flowFile = session.write(flowFile, new QueryResponseOutputStreamCallback(response)); - flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/xml"); - session.transfer(flowFile, REL_SUCCESS); + final ComponentLog logger = getLogger(); + final AtomicBoolean continuePaging = new AtomicBoolean(true); + final SolrQuery solrQuery = new SolrQuery(); - StringBuilder transitUri = new StringBuilder("solr://"); - transitUri.append(getSolrLocation()); - if (SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) { - transitUri.append("/").append(context.getProperty(COLLECTION).getValue()); - } + try { + if (id_field == null) { + id_field = getFieldNameOfUniqueKey(); + } - session.getProvenanceReporter().receive(flowFile, transitUri.toString(), duration); + final String dateField = context.getProperty(DATE_FIELD).getValue(); - // if initialized then page through the results and send out each page - if (initialized) { - int endRow = response.getResults().size(); - long totalResults = response.getResults().getNumFound(); + solrQuery.setQuery("*:*"); + final String query = context.getProperty(SOLR_QUERY).getValue(); + if (!StringUtils.isBlank(query) && !query.equals("*:*")) { + solrQuery.addFilterQuery(query); + } + final StringBuilder automatedFilterQuery = (new StringBuilder()) + .append(dateField) + .append(":[") + .append(context.getStateManager().getState(Scope.CLUSTER).get(STATE_MANAGER_FILTER)) + .append(" TO *]"); + solrQuery.addFilterQuery(automatedFilterQuery.toString()); + + final List<String> fieldList = new ArrayList<String>(); + final String returnFields = context.getProperty(RETURN_FIELDS).getValue(); + if (!StringUtils.isBlank(returnFields)) { + fieldList.addAll(Arrays.asList(returnFields.trim().split("[,]"))); + if (!fieldList.contains(dateField)) { + fieldList.add(dateField); + dateFieldNotInSpecifiedFieldsList.set(true); + } + for (String returnField : fieldList) { + solrQuery.addField(returnField.trim()); + } + } - while (endRow < totalResults) { - solrQuery.setStart(endRow); + solrQuery.setParam(CursorMarkParams.CURSOR_MARK_PARAM, context.getStateManager().getState(Scope.CLUSTER).get(STATE_MANAGER_CURSOR_MARK)); + solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger()); - stopWatch.start(); - response = getSolrClient().query(solrQuery); - stopWatch.stop(); + final StringBuilder sortClause = (new StringBuilder()) + .append(dateField) + .append(" asc,") + .append(id_field) + .append(" asc"); + solrQuery.setParam("sort", sortClause.toString()); - duration = stopWatch.getDuration(TimeUnit.MILLISECONDS); - logger.info("Retrieved results for {} in {} ms", new Object[]{query, duration}); + while (continuePaging.get()) { + final QueryRequest req = new QueryRequest(solrQuery); + if (isBasicAuthEnabled()) { + req.setBasicAuthCredentials(getUsername(), getPassword()); + } - flowFile = session.create(); + logger.debug(solrQuery.toQueryString()); + final QueryResponse response = req.process(getSolrClient()); + final SolrDocumentList documentList = response.getResults(); + + if (response.getResults().size() > 0) { + final SolrDocument lastSolrDocument = documentList.get(response.getResults().size()-1); + final String latestDateValue = df.format(lastSolrDocument.get(dateField)); + + solrQuery.setParam(CursorMarkParams.CURSOR_MARK_PARAM, response.getNextCursorMark()); + final Map<String,String> updateStateManager = new HashMap<String,String>(); + updateStateManager.putAll(context.getStateManager().getState(Scope.CLUSTER).toMap()); + updateStateManager.put(STATE_MANAGER_CURSOR_MARK, response.getNextCursorMark()); + updateStateManager.put(STATE_MANAGER_FILTER, latestDateValue); + context.getStateManager().setState(updateStateManager, Scope.CLUSTER); + + FlowFile flowFile = session.create(); + flowFile = session.putAttribute(flowFile, "solrQuery", solrQuery.toString()); + + if (context.getProperty(RETURN_TYPE).getValue().equals(MODE_XML.getValue())){ + if (dateFieldNotInSpecifiedFieldsList.get()) { + for (SolrDocument doc : response.getResults()) { + doc.removeFields(dateField); + } + } flowFile = session.write(flowFile, new QueryResponseOutputStreamCallback(response)); --- End diff -- Sure. But all users will have to change their workflows after updating NiFi, right? > GetSolr can miss recently updated documents > ------------------------------------------- > > Key: NIFI-3248 > URL: https://issues.apache.org/jira/browse/NIFI-3248 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions > Affects Versions: 1.0.0, 0.5.0, 0.6.0, 0.5.1, 0.7.0, 0.6.1, 1.1.0, 0.7.1, > 1.0.1 > Reporter: Koji Kawamura > Assignee: Johannes Peter > Attachments: nifi-flow.png, query-result-with-curly-bracket.png, > query-result-with-square-bracket.png > > > GetSolr holds the last query timestamp so that it only fetches documents > those have been added or updated since the last query. > However, GetSolr misses some of those updated documents, and once the > documents date field value becomes older than last query timestamp, the > document won't be able to be queried by GetSolr any more. > This JIRA is for tracking the process of investigating this behavior, and > discussion on them. > Here are things that can be a cause of this behavior: > |#|Short description|Should we address it?| > |1|Timestamp range filter, curly or square bracket?|No| > |2|Timezone difference between update and query|Additional docs might be > helpful| > |3|Lag comes from NearRealTIme nature of Solr|Should be documented at least, > add 'commit lag-time'?| > h2. 1. Timestamp range filter, curly or square bracket? > At the first glance, using curly and square bracket in mix looked strange > ([source > code|https://github.com/apache/nifi/blob/support/nifi-0.5.x/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java#L202]). > But these difference has a meaning. > The square bracket on the range query is inclusive and the curly bracket is > exclusive. If we use inclusive on both sides and a document has a time stamp > exactly on the boundary then it could be returned in two consecutive > executions, and we only want it in one. > This is intentional, and it should be as it is. > h2. 2. Timezone difference between update and query > Solr treats date fields as [UTC > representation|https://cwiki.apache.org/confluence/display/solr/Working+with+Dates|]. > If date field String value of an updated document represents time without > timezone, and NiFi is running on an environment using timezone other than > UTC, GetSolr can't perform date range query as users expect. > Let's say NiFi is running with JST(UTC+9). A process added a document to Solr > at 15:00 JST. But the date field doesn't have timezone. So, Solr indexed it > as 15:00 UTC. Then GetSolr performs range query at 15:10 JST, targeting any > documents updated from 15:00 to 15:10 JST. GetSolr formatted dates using UTC, > i.e. 6:00 to 6:10 UTC. The updated document won't be matched with the date > range filter. > To avoid this, updated documents must have proper timezone in date field > string representation. > If one uses NiFi expression language to set current timestamp to that date > field, following NiFi expression can be used: > {code} > ${now():format("yyyy-MM-dd'T'HH:mm:ss.SSSZ")} > {code} > It will produce a result like: > {code} > 2016-12-27T15:30:04.895+0900 > {code} > Then it will be indexed in Solr with UTC and will be queried by GetSolr as > expected. > h2. 3. Lag comes from NearRealTIme nature of Solr > Solr provides Near Real Time search capability, that means, the recently > updated documents can be queried in Near Real Time, but it's not real time. > This latency can be controlled by either on client side which requests the > update operation by specifying "commitWithin" parameter, or on the Solr > server side, "autoCommit" and "autoSoftCommit" in > [solrconfig.xml|https://cwiki.apache.org/confluence/display/solr/UpdateHandlers+in+SolrConfig#UpdateHandlersinSolrConfig-Commits]. > Since commit and updating index can be costly, it's recommended to set this > interval long enough up to the maximum tolerable latency. > However, this can be problematic with GetSolr. For instance, as shown in the > simple NiFi flow below, GetSolr can miss updated documents: > {code} > t1: GetSolr queried > t2: GenerateFlowFile set date = t2 > t3: PutSolrContentStream stored new doc > t4: GetSolr queried again, from t1 to t4, but the new doc hasn't been indexed > t5: Solr completed index > t6: GetSolr queried again, from t4 to t6, the doc didn't match query > {code} > This behavior should be at least documented. > Plus, it would be helpful to add a new configuration property to GetSolr, to > specify commit lag-time so that GetSolr aims older timestamp range to query > documents. > {code} > // with commit lag-time > t1: GetSolr queried > t2: GenerateFlowFile set date = t2 > t3: PutSolrContentStream stored new doc > t4: GetSolr queried again, from (t1 - lag) to (t4 - lag), but the new doc > hasn't been indexed > t5: Solr completed index > t6: GetSolr queried again, from (t4 - lag) to (t6 - lag), the doc can match > query > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)