[ 
https://issues.apache.org/jira/browse/NIFI-3248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16214896#comment-16214896
 ] 

ASF GitHub Bot commented on NIFI-3248:
--------------------------------------

Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2199#discussion_r146215638
  
    --- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
    @@ -170,159 +210,213 @@ protected void init(final 
ProcessorInitializationContext context) {
             return this.descriptors;
         }
     
    +    final static Set<String> propertyNamesForActivatingClearState = new 
HashSet<String>();
    +    static {
    +        propertyNamesForActivatingClearState.add(SOLR_TYPE.getName());
    +        propertyNamesForActivatingClearState.add(SOLR_LOCATION.getName());
    +        propertyNamesForActivatingClearState.add(COLLECTION.getName());
    +        propertyNamesForActivatingClearState.add(SOLR_QUERY.getName());
    +        propertyNamesForActivatingClearState.add(DATE_FIELD.getName());
    +        propertyNamesForActivatingClearState.add(RETURN_FIELDS.getName());
    +    }
    +
         @Override
         public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
    -        lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE);
    +        if 
(propertyNamesForActivatingClearState.contains(descriptor.getName()))
    +            clearState.set(true);
         }
     
    -    @OnStopped
    -    public void onStopped() {
    -        writeLastEndDate();
    -    }
    +    @OnScheduled
    +    public void clearState(final ProcessContext context) throws 
IOException {
    +        if (clearState.getAndSet(false)) {
    +            context.getStateManager().clear(Scope.CLUSTER);
    +            final Map<String,String> newStateMap = new 
HashMap<String,String>();
     
    -    @OnRemoved
    -    public void onRemoved() {
    -        final File lastEndDateCache = new File(FILE_PREFIX + 
getIdentifier());
    -        if (lastEndDateCache.exists()) {
    -            lastEndDateCache.delete();
    -        }
    -    }
    +            newStateMap.put(STATE_MANAGER_CURSOR_MARK, "*");
     
    -    @Override
    -    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
    -        final ComponentLog logger = getLogger();
    -        readLastEndDate();
    -
    -        final SimpleDateFormat sdf = new 
SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US);
    -        sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
    -        final String currDate = sdf.format(new Date());
    -
    -        final boolean initialized = 
!UNINITIALIZED_LAST_END_DATE_VALUE.equals(lastEndDatedRef.get());
    -
    -        final String query = context.getProperty(SOLR_QUERY).getValue();
    -        final SolrQuery solrQuery = new SolrQuery(query);
    -        solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger());
    -
    -        // if initialized then apply a filter to restrict results from the 
last end time til now
    -        if (initialized) {
    -            StringBuilder filterQuery = new StringBuilder();
    -            filterQuery.append(context.getProperty(DATE_FIELD).getValue())
    -                    .append(":{").append(lastEndDatedRef.get()).append(" 
TO ")
    -                    .append(currDate).append("]");
    -            solrQuery.addFilterQuery(filterQuery.toString());
    -            logger.info("Applying filter query {}", new 
Object[]{filterQuery.toString()});
    -        }
    +            final String initialDate = 
context.getProperty(DATE_FILTER).getValue();
    +            if (StringUtils.isBlank(initialDate))
    +                newStateMap.put(STATE_MANAGER_FILTER, "*");
    +            else
    +                newStateMap.put(STATE_MANAGER_FILTER, initialDate);
     
    -        final String returnFields = 
context.getProperty(RETURN_FIELDS).getValue();
    -        if (returnFields != null && !returnFields.trim().isEmpty()) {
    -            for (String returnField : returnFields.trim().split("[,]")) {
    -                solrQuery.addField(returnField.trim());
    -            }
    +            context.getStateManager().setState(newStateMap, Scope.CLUSTER);
    +
    +            id_field = null;
             }
    +    }
     
    -        final String fullSortClause = 
context.getProperty(SORT_CLAUSE).getValue();
    -        if (fullSortClause != null && !fullSortClause.trim().isEmpty()) {
    -            for (String sortClause : fullSortClause.split("[,]")) {
    -                String[] sortParts = sortClause.trim().split("[ ]");
    -                solrQuery.addSort(sortParts[0], 
SolrQuery.ORDER.valueOf(sortParts[1]));
    -            }
    +    @Override
    +    protected final Collection<ValidationResult> 
additionalCustomValidation(ValidationContext context) {
    +        final Collection<ValidationResult> problems = new ArrayList<>();
    +
    +        if 
(context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue())
    +                && !context.getProperty(RECORD_WRITER).isSet()) {
    +            problems.add(new ValidationResult.Builder()
    +                    .explanation("for parsing records a record writer has 
to be configured")
    +                    .valid(false)
    +                    .subject("Record writer check")
    +                    .build());
             }
    +        return problems;
    +    }
     
    +    private String getFieldNameOfUniqueKey() {
    +        final SolrQuery solrQuery = new SolrQuery();
             try {
    +            solrQuery.setRequestHandler("/schema/uniquekey");
                 final QueryRequest req = new QueryRequest(solrQuery);
                 if (isBasicAuthEnabled()) {
                     req.setBasicAuthCredentials(getUsername(), getPassword());
                 }
     
    -            // run the initial query and send out the first page of results
    -            final StopWatch stopWatch = new StopWatch(true);
    -            QueryResponse response = req.process(getSolrClient());
    -            stopWatch.stop();
    -
    -            long duration = stopWatch.getDuration(TimeUnit.MILLISECONDS);
    +            
return(req.process(getSolrClient()).getResponse().get("uniqueKey").toString());
    +        } catch (SolrServerException | IOException e) {
    +            getLogger().error("Solr query to retrieve uniqueKey-field 
failed due to {}", new Object[]{solrQuery.toString(), e}, e);
    +            throw new ProcessException(e);
    +        }
    +    }
     
    -            final SolrDocumentList documentList = response.getResults();
    -            logger.info("Retrieved {} results from Solr for {} in {} ms",
    -                    new Object[] {documentList.getNumFound(), query, 
duration});
    +    @Override
    +    public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
     
    -            if (documentList != null && documentList.getNumFound() > 0) {
    -                FlowFile flowFile = session.create();
    -                flowFile = session.write(flowFile, new 
QueryResponseOutputStreamCallback(response));
    -                flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), "application/xml");
    -                session.transfer(flowFile, REL_SUCCESS);
    +        final ComponentLog logger = getLogger();
    +        final AtomicBoolean continuePaging = new AtomicBoolean(true);
    +        final SolrQuery solrQuery = new SolrQuery();
     
    -                StringBuilder transitUri = new StringBuilder("solr://");
    -                transitUri.append(getSolrLocation());
    -                if 
(SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) {
    -                    
transitUri.append("/").append(context.getProperty(COLLECTION).getValue());
    -                }
    +        try {
    +            if (id_field == null) {
    +                id_field = getFieldNameOfUniqueKey();
    +            }
     
    -                session.getProvenanceReporter().receive(flowFile, 
transitUri.toString(), duration);
    +            final String dateField = 
context.getProperty(DATE_FIELD).getValue();
     
    -                // if initialized then page through the results and send 
out each page
    -                if (initialized) {
    -                    int endRow = response.getResults().size();
    -                    long totalResults = 
response.getResults().getNumFound();
    +            solrQuery.setQuery("*:*");
    +            final String query = 
context.getProperty(SOLR_QUERY).getValue();
    +            if (!StringUtils.isBlank(query) && !query.equals("*:*")) {
    +                solrQuery.addFilterQuery(query);
    +            }
    +            final StringBuilder automatedFilterQuery = (new 
StringBuilder())
    +                    .append(dateField)
    +                    .append(":[")
    +                    
.append(context.getStateManager().getState(Scope.CLUSTER).get(STATE_MANAGER_FILTER))
    +                    .append(" TO *]");
    +            solrQuery.addFilterQuery(automatedFilterQuery.toString());
    +
    +            final List<String> fieldList = new ArrayList<String>();
    +            final String returnFields = 
context.getProperty(RETURN_FIELDS).getValue();
    +            if (!StringUtils.isBlank(returnFields)) {
    +                
fieldList.addAll(Arrays.asList(returnFields.trim().split("[,]")));
    +                if (!fieldList.contains(dateField)) {
    +                    fieldList.add(dateField);
    +                    dateFieldNotInSpecifiedFieldsList.set(true);
    +                }
    +                for (String returnField : fieldList) {
    +                    solrQuery.addField(returnField.trim());
    +                }
    +            }
     
    -                    while (endRow < totalResults) {
    -                        solrQuery.setStart(endRow);
    +            solrQuery.setParam(CursorMarkParams.CURSOR_MARK_PARAM, 
context.getStateManager().getState(Scope.CLUSTER).get(STATE_MANAGER_CURSOR_MARK));
    +            solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger());
     
    -                        stopWatch.start();
    -                        response = getSolrClient().query(solrQuery);
    -                        stopWatch.stop();
    +            final StringBuilder sortClause = (new StringBuilder())
    +                    .append(dateField)
    +                    .append(" asc,")
    +                    .append(id_field)
    +                    .append(" asc");
    +            solrQuery.setParam("sort", sortClause.toString());
     
    -                        duration = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);
    -                        logger.info("Retrieved results for {} in {} ms", 
new Object[]{query, duration});
    +            while (continuePaging.get()) {
    +                final QueryRequest req = new QueryRequest(solrQuery);
    +                if (isBasicAuthEnabled()) {
    +                    req.setBasicAuthCredentials(getUsername(), 
getPassword());
    +                }
     
    -                        flowFile = session.create();
    +                logger.debug(solrQuery.toQueryString());
    +                final QueryResponse response = 
req.process(getSolrClient());
    +                final SolrDocumentList documentList = 
response.getResults();
    +
    +                if (response.getResults().size() > 0) {
    +                    final SolrDocument lastSolrDocument = 
documentList.get(response.getResults().size()-1);
    +                    final String latestDateValue = 
df.format(lastSolrDocument.get(dateField));
    +
    +                    solrQuery.setParam(CursorMarkParams.CURSOR_MARK_PARAM, 
response.getNextCursorMark());
    +                    final Map<String,String> updateStateManager = new 
HashMap<String,String>();
    +                    
updateStateManager.putAll(context.getStateManager().getState(Scope.CLUSTER).toMap());
    +                    updateStateManager.put(STATE_MANAGER_CURSOR_MARK, 
response.getNextCursorMark());
    +                    updateStateManager.put(STATE_MANAGER_FILTER, 
latestDateValue);
    +                    context.getStateManager().setState(updateStateManager, 
Scope.CLUSTER);
    +
    +                    FlowFile flowFile = session.create();
    +                    flowFile = session.putAttribute(flowFile, "solrQuery", 
solrQuery.toString());
    +
    +                    if 
(context.getProperty(RETURN_TYPE).getValue().equals(MODE_XML.getValue())){
    +                        if (dateFieldNotInSpecifiedFieldsList.get()) {
    +                            for (SolrDocument doc : response.getResults()) 
{
    +                                doc.removeFields(dateField);
    +                            }
    +                        }
                             flowFile = session.write(flowFile, new 
QueryResponseOutputStreamCallback(response));
    --- End diff --
    
    Yes, that's correct, but it's necessary.


> GetSolr can miss recently updated documents
> -------------------------------------------
>
>                 Key: NIFI-3248
>                 URL: https://issues.apache.org/jira/browse/NIFI-3248
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>    Affects Versions: 1.0.0, 0.5.0, 0.6.0, 0.5.1, 0.7.0, 0.6.1, 1.1.0, 0.7.1, 
> 1.0.1
>            Reporter: Koji Kawamura
>            Assignee: Johannes Peter
>         Attachments: nifi-flow.png, query-result-with-curly-bracket.png, 
> query-result-with-square-bracket.png
>
>
> GetSolr holds the last query timestamp so that it only fetches documents 
> those have been added or updated since the last query.
> However, GetSolr misses some of those updated documents, and once the 
> documents date field value becomes older than last query timestamp, the 
> document won't be able to be queried by GetSolr any more.
> This JIRA is for tracking the process of investigating this behavior, and 
> discussion on them.
> Here are things that can be a cause of this behavior:
> |#|Short description|Should we address it?|
> |1|Timestamp range filter, curly or square bracket?|No|
> |2|Timezone difference between update and query|Additional docs might be 
> helpful|
> |3|Lag comes from NearRealTIme nature of Solr|Should be documented at least, 
> add 'commit lag-time'?|
> h2. 1. Timestamp range filter, curly or square bracket?
> At the first glance, using curly and square bracket in mix looked strange 
> ([source 
> code|https://github.com/apache/nifi/blob/support/nifi-0.5.x/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java#L202]).
>  But these difference has a meaning.
> The square bracket on the range query is inclusive and the curly bracket is 
> exclusive. If we use inclusive on both sides and a document has a time stamp 
> exactly on the boundary then it could be returned in two consecutive 
> executions, and we only want it in one.
> This is intentional, and it should be as it is.
> h2. 2. Timezone difference between update and query
> Solr treats date fields as [UTC 
> representation|https://cwiki.apache.org/confluence/display/solr/Working+with+Dates|].
>  If date field String value of an updated document represents time without 
> timezone, and NiFi is running on an environment using timezone other than 
> UTC, GetSolr can't perform date range query as users expect.
> Let's say NiFi is running with JST(UTC+9). A process added a document to Solr 
> at 15:00 JST. But the date field doesn't have timezone. So, Solr indexed it 
> as 15:00 UTC. Then GetSolr performs range query at 15:10 JST, targeting any 
> documents updated from 15:00 to 15:10 JST. GetSolr formatted dates using UTC, 
> i.e. 6:00 to 6:10 UTC. The updated document won't be matched with the date 
> range filter.
> To avoid this, updated documents must have proper timezone in date field 
> string representation.
> If one uses NiFi expression language to set current timestamp to that date 
> field, following NiFi expression can be used:
> {code}
> ${now():format("yyyy-MM-dd'T'HH:mm:ss.SSSZ")}
> {code}
> It will produce a result like:
> {code}
> 2016-12-27T15:30:04.895+0900
> {code}
> Then it will be indexed in Solr with UTC and will be queried by GetSolr as 
> expected.
> h2. 3. Lag comes from NearRealTIme nature of Solr
> Solr provides Near Real Time search capability, that means, the recently 
> updated documents can be queried in Near Real Time, but it's not real time. 
> This latency can be controlled by either on client side which requests the 
> update operation by specifying "commitWithin" parameter, or on the Solr 
> server side, "autoCommit" and "autoSoftCommit" in 
> [solrconfig.xml|https://cwiki.apache.org/confluence/display/solr/UpdateHandlers+in+SolrConfig#UpdateHandlersinSolrConfig-Commits].
> Since commit and updating index can be costly, it's recommended to set this 
> interval long enough up to the maximum tolerable latency.
> However, this can be problematic with GetSolr. For instance, as shown in the 
> simple NiFi flow below, GetSolr can miss updated documents:
> {code}
> t1: GetSolr queried
> t2: GenerateFlowFile set date = t2
> t3: PutSolrContentStream stored new doc
> t4: GetSolr queried again, from t1 to t4, but the new doc hasn't been indexed
> t5: Solr completed index
> t6: GetSolr queried again, from t4 to t6, the doc didn't match query
> {code}
> This behavior should be at least documented.
> Plus, it would be helpful to add a new configuration property to GetSolr, to 
> specify commit lag-time so that GetSolr aims older timestamp range to query 
> documents.
> {code}
> // with commit lag-time
> t1: GetSolr queried
> t2: GenerateFlowFile set date = t2
> t3: PutSolrContentStream stored new doc
> t4: GetSolr queried again, from (t1 - lag) to (t4 - lag), but the new doc 
> hasn't been indexed
> t5: Solr completed index
> t6: GetSolr queried again, from (t4 - lag) to (t6 - lag), the doc can match 
> query
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to