[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2199 ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r146215638 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -170,159 +210,213 @@ protected void init(final ProcessorInitializationContext context) { return this.descriptors; } +final static Set propertyNamesForActivatingClearState = new HashSet(); +static { +propertyNamesForActivatingClearState.add(SOLR_TYPE.getName()); +propertyNamesForActivatingClearState.add(SOLR_LOCATION.getName()); +propertyNamesForActivatingClearState.add(COLLECTION.getName()); +propertyNamesForActivatingClearState.add(SOLR_QUERY.getName()); +propertyNamesForActivatingClearState.add(DATE_FIELD.getName()); +propertyNamesForActivatingClearState.add(RETURN_FIELDS.getName()); +} + @Override public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { -lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE); +if (propertyNamesForActivatingClearState.contains(descriptor.getName())) +clearState.set(true); } -@OnStopped -public void onStopped() { -writeLastEndDate(); -} +@OnScheduled +public void clearState(final ProcessContext context) throws IOException { +if (clearState.getAndSet(false)) { +context.getStateManager().clear(Scope.CLUSTER); +final Map newStateMap = new HashMap(); -@OnRemoved -public void onRemoved() { -final File lastEndDateCache = new File(FILE_PREFIX + getIdentifier()); -if (lastEndDateCache.exists()) { -lastEndDateCache.delete(); -} -} +newStateMap.put(STATE_MANAGER_CURSOR_MARK, "*"); -@Override -public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { -final ComponentLog logger = getLogger(); -readLastEndDate(); - -final SimpleDateFormat sdf = new SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US); -sdf.setTimeZone(TimeZone.getTimeZone("GMT")); -final String currDate = sdf.format(new Date()); - -final boolean initialized = !UNINITIALIZED_LAST_END_DATE_VALUE.equals(lastEndDatedRef.get()); - -final String query = context.getProperty(SOLR_QUERY).getValue(); -final SolrQuery solrQuery = new SolrQuery(query); -solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger()); - -// if initialized then apply a filter to restrict results from the last end time til now -if (initialized) { -StringBuilder filterQuery = new StringBuilder(); -filterQuery.append(context.getProperty(DATE_FIELD).getValue()) -.append(":{").append(lastEndDatedRef.get()).append(" TO ") -.append(currDate).append("]"); -solrQuery.addFilterQuery(filterQuery.toString()); -logger.info("Applying filter query {}", new Object[]{filterQuery.toString()}); -} +final String initialDate = context.getProperty(DATE_FILTER).getValue(); +if (StringUtils.isBlank(initialDate)) +newStateMap.put(STATE_MANAGER_FILTER, "*"); +else +newStateMap.put(STATE_MANAGER_FILTER, initialDate); -final String returnFields = context.getProperty(RETURN_FIELDS).getValue(); -if (returnFields != null && !returnFields.trim().isEmpty()) { -for (String returnField : returnFields.trim().split("[,]")) { -solrQuery.addField(returnField.trim()); -} +context.getStateManager().setState(newStateMap, Scope.CLUSTER); + +id_field = null; } +} -final String fullSortClause = context.getProperty(SORT_CLAUSE).getValue(); -if (fullSortClause != null && !fullSortClause.trim().isEmpty()) { -for (String sortClause : fullSortClause.split("[,]")) { -String[] sortParts = sortClause.trim().split("[ ]"); -solrQuery.addSort(sortParts[0], SolrQuery.ORDER.valueOf(sortParts[1])); -} +@Override +protected final Collection additionalCustomValidation(ValidationContext context) { +final Collection problems = new ArrayList<>(); + +if (context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue()) +&& !context.getProperty(RECOR
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r146214902 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -66,42 +79,72 @@ import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.CursorMarkParams; -@Tags({"Apache", "Solr", "Get", "Pull"}) +@Tags({"Apache", "Solr", "Get", "Pull", "Records"}) @InputRequirement(Requirement.INPUT_FORBIDDEN) -@CapabilityDescription("Queries Solr and outputs the results as a FlowFile") +@CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer") +@Stateful(scopes = {Scope.CLUSTER}, description = "Stores latest date of Date Field so that the same data will not be fetched multiple times.") public class GetSolr extends SolrProcessor { -public static final PropertyDescriptor SOLR_QUERY = new PropertyDescriptor -.Builder().name("Solr Query") -.description("A query to execute against Solr") +public static final String STATE_MANAGER_FILTER = "stateManager_filter"; +public static final String STATE_MANAGER_CURSOR_MARK = "stateManager_cursorMark"; +public static final AllowableValue MODE_XML = new AllowableValue("XML"); +public static final AllowableValue MODE_REC = new AllowableValue("Records"); --- End diff -- Thanks for your detailed considerations. I agree it's not easy task to do. To not lose your informative comments for future work, I've created another JIRA. https://issues.apache.org/jira/browse/NIFI-4514 ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user JohannesDaniel commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145946147 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -66,42 +79,72 @@ import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.CursorMarkParams; -@Tags({"Apache", "Solr", "Get", "Pull"}) +@Tags({"Apache", "Solr", "Get", "Pull", "Records"}) @InputRequirement(Requirement.INPUT_FORBIDDEN) -@CapabilityDescription("Queries Solr and outputs the results as a FlowFile") +@CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer") +@Stateful(scopes = {Scope.CLUSTER}, description = "Stores latest date of Date Field so that the same data will not be fetched multiple times.") public class GetSolr extends SolrProcessor { -public static final PropertyDescriptor SOLR_QUERY = new PropertyDescriptor -.Builder().name("Solr Query") -.description("A query to execute against Solr") +public static final String STATE_MANAGER_FILTER = "stateManager_filter"; +public static final String STATE_MANAGER_CURSOR_MARK = "stateManager_cursorMark"; +public static final AllowableValue MODE_XML = new AllowableValue("XML"); +public static final AllowableValue MODE_REC = new AllowableValue("Records"); --- End diff -- Hmm, and dynamic fields could become a problem... I think this is not possible. ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user JohannesDaniel commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145727845 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -170,159 +210,213 @@ protected void init(final ProcessorInitializationContext context) { return this.descriptors; } +final static Set propertyNamesForActivatingClearState = new HashSet(); +static { +propertyNamesForActivatingClearState.add(SOLR_TYPE.getName()); +propertyNamesForActivatingClearState.add(SOLR_LOCATION.getName()); +propertyNamesForActivatingClearState.add(COLLECTION.getName()); +propertyNamesForActivatingClearState.add(SOLR_QUERY.getName()); +propertyNamesForActivatingClearState.add(DATE_FIELD.getName()); +propertyNamesForActivatingClearState.add(RETURN_FIELDS.getName()); +} + @Override public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { -lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE); +if (propertyNamesForActivatingClearState.contains(descriptor.getName())) +clearState.set(true); } -@OnStopped -public void onStopped() { -writeLastEndDate(); -} +@OnScheduled +public void clearState(final ProcessContext context) throws IOException { +if (clearState.getAndSet(false)) { +context.getStateManager().clear(Scope.CLUSTER); +final Map newStateMap = new HashMap(); -@OnRemoved -public void onRemoved() { -final File lastEndDateCache = new File(FILE_PREFIX + getIdentifier()); -if (lastEndDateCache.exists()) { -lastEndDateCache.delete(); -} -} +newStateMap.put(STATE_MANAGER_CURSOR_MARK, "*"); -@Override -public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { -final ComponentLog logger = getLogger(); -readLastEndDate(); - -final SimpleDateFormat sdf = new SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US); -sdf.setTimeZone(TimeZone.getTimeZone("GMT")); -final String currDate = sdf.format(new Date()); - -final boolean initialized = !UNINITIALIZED_LAST_END_DATE_VALUE.equals(lastEndDatedRef.get()); - -final String query = context.getProperty(SOLR_QUERY).getValue(); -final SolrQuery solrQuery = new SolrQuery(query); -solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger()); - -// if initialized then apply a filter to restrict results from the last end time til now -if (initialized) { -StringBuilder filterQuery = new StringBuilder(); -filterQuery.append(context.getProperty(DATE_FIELD).getValue()) -.append(":{").append(lastEndDatedRef.get()).append(" TO ") -.append(currDate).append("]"); -solrQuery.addFilterQuery(filterQuery.toString()); -logger.info("Applying filter query {}", new Object[]{filterQuery.toString()}); -} +final String initialDate = context.getProperty(DATE_FILTER).getValue(); +if (StringUtils.isBlank(initialDate)) +newStateMap.put(STATE_MANAGER_FILTER, "*"); +else +newStateMap.put(STATE_MANAGER_FILTER, initialDate); -final String returnFields = context.getProperty(RETURN_FIELDS).getValue(); -if (returnFields != null && !returnFields.trim().isEmpty()) { -for (String returnField : returnFields.trim().split("[,]")) { -solrQuery.addField(returnField.trim()); -} +context.getStateManager().setState(newStateMap, Scope.CLUSTER); + +id_field = null; } +} -final String fullSortClause = context.getProperty(SORT_CLAUSE).getValue(); -if (fullSortClause != null && !fullSortClause.trim().isEmpty()) { -for (String sortClause : fullSortClause.split("[,]")) { -String[] sortParts = sortClause.trim().split("[ ]"); -solrQuery.addSort(sortParts[0], SolrQuery.ORDER.valueOf(sortParts[1])); -} +@Override +protected final Collection additionalCustomValidation(ValidationContext context) { +final Collection problems = new ArrayList<>(); + +if (context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue()) +&& !context.getProperty(REC
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user JohannesDaniel commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145721674 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -66,42 +79,72 @@ import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.CursorMarkParams; -@Tags({"Apache", "Solr", "Get", "Pull"}) +@Tags({"Apache", "Solr", "Get", "Pull", "Records"}) @InputRequirement(Requirement.INPUT_FORBIDDEN) -@CapabilityDescription("Queries Solr and outputs the results as a FlowFile") +@CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer") +@Stateful(scopes = {Scope.CLUSTER}, description = "Stores latest date of Date Field so that the same data will not be fetched multiple times.") public class GetSolr extends SolrProcessor { -public static final PropertyDescriptor SOLR_QUERY = new PropertyDescriptor -.Builder().name("Solr Query") -.description("A query to execute against Solr") +public static final String STATE_MANAGER_FILTER = "stateManager_filter"; +public static final String STATE_MANAGER_CURSOR_MARK = "stateManager_cursorMark"; +public static final AllowableValue MODE_XML = new AllowableValue("XML"); +public static final AllowableValue MODE_REC = new AllowableValue("Records"); + +public static final PropertyDescriptor RETURN_TYPE = new PropertyDescriptor +.Builder().name("Return Type") +.displayName("Return Type") --- End diff -- The most properties were already available in the prior GetSol processor. I expected this to be critical for backwards compatibility. For the new properties I chose the same naming pattern. ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user JohannesDaniel commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145720938 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -66,42 +79,72 @@ import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.CursorMarkParams; -@Tags({"Apache", "Solr", "Get", "Pull"}) +@Tags({"Apache", "Solr", "Get", "Pull", "Records"}) @InputRequirement(Requirement.INPUT_FORBIDDEN) -@CapabilityDescription("Queries Solr and outputs the results as a FlowFile") +@CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer") +@Stateful(scopes = {Scope.CLUSTER}, description = "Stores latest date of Date Field so that the same data will not be fetched multiple times.") public class GetSolr extends SolrProcessor { -public static final PropertyDescriptor SOLR_QUERY = new PropertyDescriptor -.Builder().name("Solr Query") -.description("A query to execute against Solr") +public static final String STATE_MANAGER_FILTER = "stateManager_filter"; +public static final String STATE_MANAGER_CURSOR_MARK = "stateManager_cursorMark"; +public static final AllowableValue MODE_XML = new AllowableValue("XML"); +public static final AllowableValue MODE_REC = new AllowableValue("Records"); --- End diff -- Additionally, this requires parsing of response json, as the response parsing of Schema API is not really realized in SolrJ ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user JohannesDaniel commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145719121 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -66,42 +79,72 @@ import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.CursorMarkParams; -@Tags({"Apache", "Solr", "Get", "Pull"}) +@Tags({"Apache", "Solr", "Get", "Pull", "Records"}) @InputRequirement(Requirement.INPUT_FORBIDDEN) -@CapabilityDescription("Queries Solr and outputs the results as a FlowFile") +@CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer") +@Stateful(scopes = {Scope.CLUSTER}, description = "Stores latest date of Date Field so that the same data will not be fetched multiple times.") public class GetSolr extends SolrProcessor { -public static final PropertyDescriptor SOLR_QUERY = new PropertyDescriptor -.Builder().name("Solr Query") -.description("A query to execute against Solr") +public static final String STATE_MANAGER_FILTER = "stateManager_filter"; +public static final String STATE_MANAGER_CURSOR_MARK = "stateManager_cursorMark"; +public static final AllowableValue MODE_XML = new AllowableValue("XML"); +public static final AllowableValue MODE_REC = new AllowableValue("Records"); --- End diff -- The difficulty with this is that Solr provides various different field types for different kinds of data. For instance, an integer could be derived from an Int, TrieInt (version < 7.0) or Pint (version >= 7.0) field. This requires a comprehensive fieldtype-datatype mapping. ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user JohannesDaniel commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145712892 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -66,42 +79,72 @@ import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.CursorMarkParams; -@Tags({"Apache", "Solr", "Get", "Pull"}) +@Tags({"Apache", "Solr", "Get", "Pull", "Records"}) @InputRequirement(Requirement.INPUT_FORBIDDEN) -@CapabilityDescription("Queries Solr and outputs the results as a FlowFile") +@CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer") +@Stateful(scopes = {Scope.CLUSTER}, description = "Stores latest date of Date Field so that the same data will not be fetched multiple times.") public class GetSolr extends SolrProcessor { -public static final PropertyDescriptor SOLR_QUERY = new PropertyDescriptor -.Builder().name("Solr Query") -.description("A query to execute against Solr") +public static final String STATE_MANAGER_FILTER = "stateManager_filter"; +public static final String STATE_MANAGER_CURSOR_MARK = "stateManager_cursorMark"; +public static final AllowableValue MODE_XML = new AllowableValue("XML"); +public static final AllowableValue MODE_REC = new AllowableValue("Records"); --- End diff -- Principally yes, by using the Schema API. But I dont expect this to be too easy. I suggest that we create a separate ticket for this as it should require some deeper considerations. ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145688724 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -66,42 +79,72 @@ import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.CursorMarkParams; -@Tags({"Apache", "Solr", "Get", "Pull"}) +@Tags({"Apache", "Solr", "Get", "Pull", "Records"}) @InputRequirement(Requirement.INPUT_FORBIDDEN) -@CapabilityDescription("Queries Solr and outputs the results as a FlowFile") +@CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer") +@Stateful(scopes = {Scope.CLUSTER}, description = "Stores latest date of Date Field so that the same data will not be fetched multiple times.") public class GetSolr extends SolrProcessor { -public static final PropertyDescriptor SOLR_QUERY = new PropertyDescriptor -.Builder().name("Solr Query") -.description("A query to execute against Solr") +public static final String STATE_MANAGER_FILTER = "stateManager_filter"; +public static final String STATE_MANAGER_CURSOR_MARK = "stateManager_cursorMark"; +public static final AllowableValue MODE_XML = new AllowableValue("XML"); +public static final AllowableValue MODE_REC = new AllowableValue("Records"); + +public static final PropertyDescriptor RETURN_TYPE = new PropertyDescriptor +.Builder().name("Return Type") +.displayName("Return Type") +.description("Write Solr documents to FlowFiles as XML or using a Record Writer") .required(true) -.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.allowableValues(MODE_XML, MODE_REC) +.defaultValue(MODE_REC.getValue()) --- End diff -- The default value should be MODE_XML as it did before. ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145696081 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -170,159 +210,213 @@ protected void init(final ProcessorInitializationContext context) { return this.descriptors; } +final static Set propertyNamesForActivatingClearState = new HashSet(); +static { +propertyNamesForActivatingClearState.add(SOLR_TYPE.getName()); +propertyNamesForActivatingClearState.add(SOLR_LOCATION.getName()); +propertyNamesForActivatingClearState.add(COLLECTION.getName()); +propertyNamesForActivatingClearState.add(SOLR_QUERY.getName()); +propertyNamesForActivatingClearState.add(DATE_FIELD.getName()); +propertyNamesForActivatingClearState.add(RETURN_FIELDS.getName()); +} + @Override public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { -lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE); +if (propertyNamesForActivatingClearState.contains(descriptor.getName())) +clearState.set(true); } -@OnStopped -public void onStopped() { -writeLastEndDate(); -} +@OnScheduled +public void clearState(final ProcessContext context) throws IOException { +if (clearState.getAndSet(false)) { +context.getStateManager().clear(Scope.CLUSTER); +final Map newStateMap = new HashMap(); -@OnRemoved -public void onRemoved() { -final File lastEndDateCache = new File(FILE_PREFIX + getIdentifier()); -if (lastEndDateCache.exists()) { -lastEndDateCache.delete(); -} -} +newStateMap.put(STATE_MANAGER_CURSOR_MARK, "*"); -@Override -public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { -final ComponentLog logger = getLogger(); -readLastEndDate(); - -final SimpleDateFormat sdf = new SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US); -sdf.setTimeZone(TimeZone.getTimeZone("GMT")); -final String currDate = sdf.format(new Date()); - -final boolean initialized = !UNINITIALIZED_LAST_END_DATE_VALUE.equals(lastEndDatedRef.get()); - -final String query = context.getProperty(SOLR_QUERY).getValue(); -final SolrQuery solrQuery = new SolrQuery(query); -solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger()); - -// if initialized then apply a filter to restrict results from the last end time til now -if (initialized) { -StringBuilder filterQuery = new StringBuilder(); -filterQuery.append(context.getProperty(DATE_FIELD).getValue()) -.append(":{").append(lastEndDatedRef.get()).append(" TO ") -.append(currDate).append("]"); -solrQuery.addFilterQuery(filterQuery.toString()); -logger.info("Applying filter query {}", new Object[]{filterQuery.toString()}); -} +final String initialDate = context.getProperty(DATE_FILTER).getValue(); +if (StringUtils.isBlank(initialDate)) +newStateMap.put(STATE_MANAGER_FILTER, "*"); +else +newStateMap.put(STATE_MANAGER_FILTER, initialDate); -final String returnFields = context.getProperty(RETURN_FIELDS).getValue(); -if (returnFields != null && !returnFields.trim().isEmpty()) { -for (String returnField : returnFields.trim().split("[,]")) { -solrQuery.addField(returnField.trim()); -} +context.getStateManager().setState(newStateMap, Scope.CLUSTER); + +id_field = null; } +} -final String fullSortClause = context.getProperty(SORT_CLAUSE).getValue(); -if (fullSortClause != null && !fullSortClause.trim().isEmpty()) { -for (String sortClause : fullSortClause.split("[,]")) { -String[] sortParts = sortClause.trim().split("[ ]"); -solrQuery.addSort(sortParts[0], SolrQuery.ORDER.valueOf(sortParts[1])); -} +@Override +protected final Collection additionalCustomValidation(ValidationContext context) { +final Collection problems = new ArrayList<>(); + +if (context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue()) +&& !context.getProperty(RECOR
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145698373 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -170,159 +210,213 @@ protected void init(final ProcessorInitializationContext context) { return this.descriptors; } +final static Set propertyNamesForActivatingClearState = new HashSet(); +static { +propertyNamesForActivatingClearState.add(SOLR_TYPE.getName()); +propertyNamesForActivatingClearState.add(SOLR_LOCATION.getName()); +propertyNamesForActivatingClearState.add(COLLECTION.getName()); +propertyNamesForActivatingClearState.add(SOLR_QUERY.getName()); +propertyNamesForActivatingClearState.add(DATE_FIELD.getName()); +propertyNamesForActivatingClearState.add(RETURN_FIELDS.getName()); +} + @Override public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { -lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE); +if (propertyNamesForActivatingClearState.contains(descriptor.getName())) +clearState.set(true); } -@OnStopped -public void onStopped() { -writeLastEndDate(); -} +@OnScheduled +public void clearState(final ProcessContext context) throws IOException { +if (clearState.getAndSet(false)) { +context.getStateManager().clear(Scope.CLUSTER); +final Map newStateMap = new HashMap(); -@OnRemoved -public void onRemoved() { -final File lastEndDateCache = new File(FILE_PREFIX + getIdentifier()); -if (lastEndDateCache.exists()) { -lastEndDateCache.delete(); -} -} +newStateMap.put(STATE_MANAGER_CURSOR_MARK, "*"); -@Override -public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { -final ComponentLog logger = getLogger(); -readLastEndDate(); - -final SimpleDateFormat sdf = new SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US); -sdf.setTimeZone(TimeZone.getTimeZone("GMT")); -final String currDate = sdf.format(new Date()); - -final boolean initialized = !UNINITIALIZED_LAST_END_DATE_VALUE.equals(lastEndDatedRef.get()); - -final String query = context.getProperty(SOLR_QUERY).getValue(); -final SolrQuery solrQuery = new SolrQuery(query); -solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger()); - -// if initialized then apply a filter to restrict results from the last end time til now -if (initialized) { -StringBuilder filterQuery = new StringBuilder(); -filterQuery.append(context.getProperty(DATE_FIELD).getValue()) -.append(":{").append(lastEndDatedRef.get()).append(" TO ") -.append(currDate).append("]"); -solrQuery.addFilterQuery(filterQuery.toString()); -logger.info("Applying filter query {}", new Object[]{filterQuery.toString()}); -} +final String initialDate = context.getProperty(DATE_FILTER).getValue(); +if (StringUtils.isBlank(initialDate)) +newStateMap.put(STATE_MANAGER_FILTER, "*"); +else +newStateMap.put(STATE_MANAGER_FILTER, initialDate); -final String returnFields = context.getProperty(RETURN_FIELDS).getValue(); -if (returnFields != null && !returnFields.trim().isEmpty()) { -for (String returnField : returnFields.trim().split("[,]")) { -solrQuery.addField(returnField.trim()); -} +context.getStateManager().setState(newStateMap, Scope.CLUSTER); + +id_field = null; } +} -final String fullSortClause = context.getProperty(SORT_CLAUSE).getValue(); -if (fullSortClause != null && !fullSortClause.trim().isEmpty()) { -for (String sortClause : fullSortClause.split("[,]")) { -String[] sortParts = sortClause.trim().split("[ ]"); -solrQuery.addSort(sortParts[0], SolrQuery.ORDER.valueOf(sortParts[1])); -} +@Override +protected final Collection additionalCustomValidation(ValidationContext context) { +final Collection problems = new ArrayList<>(); + +if (context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue()) +&& !context.getProperty(RECOR
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145696508 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -170,159 +210,213 @@ protected void init(final ProcessorInitializationContext context) { return this.descriptors; } +final static Set propertyNamesForActivatingClearState = new HashSet(); +static { +propertyNamesForActivatingClearState.add(SOLR_TYPE.getName()); +propertyNamesForActivatingClearState.add(SOLR_LOCATION.getName()); +propertyNamesForActivatingClearState.add(COLLECTION.getName()); +propertyNamesForActivatingClearState.add(SOLR_QUERY.getName()); +propertyNamesForActivatingClearState.add(DATE_FIELD.getName()); +propertyNamesForActivatingClearState.add(RETURN_FIELDS.getName()); +} + @Override public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { -lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE); +if (propertyNamesForActivatingClearState.contains(descriptor.getName())) +clearState.set(true); } -@OnStopped -public void onStopped() { -writeLastEndDate(); -} +@OnScheduled +public void clearState(final ProcessContext context) throws IOException { +if (clearState.getAndSet(false)) { +context.getStateManager().clear(Scope.CLUSTER); +final Map newStateMap = new HashMap(); -@OnRemoved -public void onRemoved() { -final File lastEndDateCache = new File(FILE_PREFIX + getIdentifier()); -if (lastEndDateCache.exists()) { -lastEndDateCache.delete(); -} -} +newStateMap.put(STATE_MANAGER_CURSOR_MARK, "*"); -@Override -public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { -final ComponentLog logger = getLogger(); -readLastEndDate(); - -final SimpleDateFormat sdf = new SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US); -sdf.setTimeZone(TimeZone.getTimeZone("GMT")); -final String currDate = sdf.format(new Date()); - -final boolean initialized = !UNINITIALIZED_LAST_END_DATE_VALUE.equals(lastEndDatedRef.get()); - -final String query = context.getProperty(SOLR_QUERY).getValue(); -final SolrQuery solrQuery = new SolrQuery(query); -solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger()); - -// if initialized then apply a filter to restrict results from the last end time til now -if (initialized) { -StringBuilder filterQuery = new StringBuilder(); -filterQuery.append(context.getProperty(DATE_FIELD).getValue()) -.append(":{").append(lastEndDatedRef.get()).append(" TO ") -.append(currDate).append("]"); -solrQuery.addFilterQuery(filterQuery.toString()); -logger.info("Applying filter query {}", new Object[]{filterQuery.toString()}); -} +final String initialDate = context.getProperty(DATE_FILTER).getValue(); +if (StringUtils.isBlank(initialDate)) +newStateMap.put(STATE_MANAGER_FILTER, "*"); +else +newStateMap.put(STATE_MANAGER_FILTER, initialDate); -final String returnFields = context.getProperty(RETURN_FIELDS).getValue(); -if (returnFields != null && !returnFields.trim().isEmpty()) { -for (String returnField : returnFields.trim().split("[,]")) { -solrQuery.addField(returnField.trim()); -} +context.getStateManager().setState(newStateMap, Scope.CLUSTER); + +id_field = null; } +} -final String fullSortClause = context.getProperty(SORT_CLAUSE).getValue(); -if (fullSortClause != null && !fullSortClause.trim().isEmpty()) { -for (String sortClause : fullSortClause.split("[,]")) { -String[] sortParts = sortClause.trim().split("[ ]"); -solrQuery.addSort(sortParts[0], SolrQuery.ORDER.valueOf(sortParts[1])); -} +@Override +protected final Collection additionalCustomValidation(ValidationContext context) { +final Collection problems = new ArrayList<>(); + +if (context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue()) +&& !context.getProperty(RECOR
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145697961 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -170,159 +210,213 @@ protected void init(final ProcessorInitializationContext context) { return this.descriptors; } +final static Set propertyNamesForActivatingClearState = new HashSet(); +static { +propertyNamesForActivatingClearState.add(SOLR_TYPE.getName()); +propertyNamesForActivatingClearState.add(SOLR_LOCATION.getName()); +propertyNamesForActivatingClearState.add(COLLECTION.getName()); +propertyNamesForActivatingClearState.add(SOLR_QUERY.getName()); +propertyNamesForActivatingClearState.add(DATE_FIELD.getName()); +propertyNamesForActivatingClearState.add(RETURN_FIELDS.getName()); +} + @Override public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { -lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE); +if (propertyNamesForActivatingClearState.contains(descriptor.getName())) +clearState.set(true); } -@OnStopped -public void onStopped() { -writeLastEndDate(); -} +@OnScheduled +public void clearState(final ProcessContext context) throws IOException { +if (clearState.getAndSet(false)) { +context.getStateManager().clear(Scope.CLUSTER); +final Map newStateMap = new HashMap(); -@OnRemoved -public void onRemoved() { -final File lastEndDateCache = new File(FILE_PREFIX + getIdentifier()); -if (lastEndDateCache.exists()) { -lastEndDateCache.delete(); -} -} +newStateMap.put(STATE_MANAGER_CURSOR_MARK, "*"); -@Override -public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { -final ComponentLog logger = getLogger(); -readLastEndDate(); - -final SimpleDateFormat sdf = new SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US); -sdf.setTimeZone(TimeZone.getTimeZone("GMT")); -final String currDate = sdf.format(new Date()); - -final boolean initialized = !UNINITIALIZED_LAST_END_DATE_VALUE.equals(lastEndDatedRef.get()); - -final String query = context.getProperty(SOLR_QUERY).getValue(); -final SolrQuery solrQuery = new SolrQuery(query); -solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger()); - -// if initialized then apply a filter to restrict results from the last end time til now -if (initialized) { -StringBuilder filterQuery = new StringBuilder(); -filterQuery.append(context.getProperty(DATE_FIELD).getValue()) -.append(":{").append(lastEndDatedRef.get()).append(" TO ") -.append(currDate).append("]"); -solrQuery.addFilterQuery(filterQuery.toString()); -logger.info("Applying filter query {}", new Object[]{filterQuery.toString()}); -} +final String initialDate = context.getProperty(DATE_FILTER).getValue(); +if (StringUtils.isBlank(initialDate)) +newStateMap.put(STATE_MANAGER_FILTER, "*"); +else +newStateMap.put(STATE_MANAGER_FILTER, initialDate); -final String returnFields = context.getProperty(RETURN_FIELDS).getValue(); -if (returnFields != null && !returnFields.trim().isEmpty()) { -for (String returnField : returnFields.trim().split("[,]")) { -solrQuery.addField(returnField.trim()); -} +context.getStateManager().setState(newStateMap, Scope.CLUSTER); + +id_field = null; } +} -final String fullSortClause = context.getProperty(SORT_CLAUSE).getValue(); -if (fullSortClause != null && !fullSortClause.trim().isEmpty()) { -for (String sortClause : fullSortClause.split("[,]")) { -String[] sortParts = sortClause.trim().split("[ ]"); -solrQuery.addSort(sortParts[0], SolrQuery.ORDER.valueOf(sortParts[1])); -} +@Override +protected final Collection additionalCustomValidation(ValidationContext context) { +final Collection problems = new ArrayList<>(); + +if (context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue()) +&& !context.getProperty(RECOR
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145690789 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -66,42 +79,72 @@ import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.CursorMarkParams; -@Tags({"Apache", "Solr", "Get", "Pull"}) +@Tags({"Apache", "Solr", "Get", "Pull", "Records"}) @InputRequirement(Requirement.INPUT_FORBIDDEN) -@CapabilityDescription("Queries Solr and outputs the results as a FlowFile") +@CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer") +@Stateful(scopes = {Scope.CLUSTER}, description = "Stores latest date of Date Field so that the same data will not be fetched multiple times.") public class GetSolr extends SolrProcessor { -public static final PropertyDescriptor SOLR_QUERY = new PropertyDescriptor -.Builder().name("Solr Query") -.description("A query to execute against Solr") +public static final String STATE_MANAGER_FILTER = "stateManager_filter"; +public static final String STATE_MANAGER_CURSOR_MARK = "stateManager_cursorMark"; +public static final AllowableValue MODE_XML = new AllowableValue("XML"); +public static final AllowableValue MODE_REC = new AllowableValue("Records"); + +public static final PropertyDescriptor RETURN_TYPE = new PropertyDescriptor +.Builder().name("Return Type") +.displayName("Return Type") --- End diff -- Although I haven't seen a specific guideline or documentation, other processors prefer having `name` in lower case looks like a key of property or configuration name such as `return_type` so that user can type the name without worrying about spacing or case sensitivity, while `displayName` is a more verbose human readable name. `name` would be more important in the world of MiNiFi or other application directly talks with NiFi API programatically. I don't have strong opinion here but just wanted to share what those two are. ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145699419 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -66,42 +79,72 @@ import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.CursorMarkParams; -@Tags({"Apache", "Solr", "Get", "Pull"}) +@Tags({"Apache", "Solr", "Get", "Pull", "Records"}) @InputRequirement(Requirement.INPUT_FORBIDDEN) -@CapabilityDescription("Queries Solr and outputs the results as a FlowFile") +@CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer") +@Stateful(scopes = {Scope.CLUSTER}, description = "Stores latest date of Date Field so that the same data will not be fetched multiple times.") public class GetSolr extends SolrProcessor { -public static final PropertyDescriptor SOLR_QUERY = new PropertyDescriptor -.Builder().name("Solr Query") -.description("A query to execute against Solr") +public static final String STATE_MANAGER_FILTER = "stateManager_filter"; +public static final String STATE_MANAGER_CURSOR_MARK = "stateManager_cursorMark"; +public static final AllowableValue MODE_XML = new AllowableValue("XML"); +public static final AllowableValue MODE_REC = new AllowableValue("Records"); --- End diff -- Just an idea. Configuring a schema for the writer manually can be cumbersome. I wonder if it's possible to load a schema from the target collection then auto generate NiFi record schema from it. Do you think it's doable? ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145678316 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -126,6 +126,14 @@ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); +public static final PropertyDescriptor DATE_FILTER = new PropertyDescriptor +.Builder().name("Initial Date Filter") +.displayName("Initial Date Filter") +.description("Date value to filter results. Documents with an earlier date will not be fetched. The format has to correspond to the date pattern of Solr '-MM-DDThh:mm:ssZ'") +.required(false) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + --- End diff -- You can change description of processor at `@CapabilityDescription` annotation. https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java#L72 If more detailed documentation is needed, processor can have `additionalDetails.html` like this. https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org.apache.nifi.processors.solr.PutSolrContentStream/additionalDetails.html ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user JohannesDaniel commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145612359 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -126,6 +126,14 @@ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); +public static final PropertyDescriptor DATE_FILTER = new PropertyDescriptor +.Builder().name("Initial Date Filter") +.displayName("Initial Date Filter") +.description("Date value to filter results. Documents with an earlier date will not be fetched. The format has to correspond to the date pattern of Solr '-MM-DDThh:mm:ssZ'") +.required(false) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + --- End diff -- This property should make it quite obvious, how backwards compatibility can be achieved. Additionally, I will describe it in the documentation. BTW: Where can I change descriptions of processor usage? Did not find them in folder nifi-docs... ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145416772 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java --- @@ -275,7 +275,7 @@ protected final boolean isBasicAuthEnabled() { } @Override -protected final Collection customValidate(ValidationContext context) { +protected Collection customValidate(ValidationContext context) { --- End diff -- Good call. Then the method can be a non-abstract method at SolrProcessor that does nothing. ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145416024 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/testCollection/conf/schema.xml --- @@ -16,6 +16,16 @@ + + + + + + + + +id --- End diff -- I agree with that, most indices have unique key. But just asked it because it is not mandatory to have an unique key according to [Solr documentation](https://wiki.apache.org/solr/SchemaXml#The_Unique_Key_Field). Then I prefer to state that unique key is required for this processor to work properly in NiFi documentation. ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user JohannesDaniel commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145415068 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java --- @@ -275,7 +275,7 @@ protected final boolean isBasicAuthEnabled() { } @Override -protected final Collection customValidate(ValidationContext context) { +protected Collection customValidate(ValidationContext context) { --- End diff -- ok. by doing so, i will also have to add this method to PutSolrContentStream ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145412969 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java --- @@ -275,7 +275,7 @@ protected final boolean isBasicAuthEnabled() { } @Override -protected final Collection customValidate(ValidationContext context) { +protected Collection customValidate(ValidationContext context) { --- End diff -- I imagine the reason why this customValidate is marked with `final` is that because the original author wanted to avoid sub-classes skip executing validation code implemented here. You implemented within GetSolr, and call `super.customValidate` from there, so it should be fine, but other sub-class can forget to call `super.customValidate` if we remove `final` keyword. So, I thought it might be safer approach to add an abstract method, such as `additionalCustomValidate` at SolrProcessor, then call it from customValidate, and let sub-classes implement custom validation in it. ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145410576 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -66,42 +79,64 @@ import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.CursorMarkParams; -@Tags({"Apache", "Solr", "Get", "Pull"}) +@Tags({"Apache", "Solr", "Get", "Pull", "Records"}) @InputRequirement(Requirement.INPUT_FORBIDDEN) -@CapabilityDescription("Queries Solr and outputs the results as a FlowFile") +@CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer") +@Stateful(scopes = {Scope.LOCAL}, description = "Stores latest date of Date Field so that the same data will not be fetched multiple times.") --- End diff -- The exact way to realize backward compatibility is up to you :) I'm fine as long as user can understand how to migrate existing state to new version of this processor. If it needs to be done manually, then it should be documented. ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user JohannesDaniel commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145408461 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/testCollection/conf/schema.xml --- @@ -16,6 +16,16 @@ + + + + + + + + +id --- End diff -- the uniqueKey field has to be part of the sorting. Well-configured Solr indexes always include this kind of field as many things will not work properly without this field. Actually, I have never seen a Solr index without this (and I have seen a lot ... ;). ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user JohannesDaniel commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145405902 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java --- @@ -275,7 +275,7 @@ protected final boolean isBasicAuthEnabled() { } @Override -protected final Collection customValidate(ValidationContext context) { +protected Collection customValidate(ValidationContext context) { --- End diff -- I did within class GetSolr ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user JohannesDaniel commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145404225 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -172,157 +203,196 @@ protected void init(final ProcessorInitializationContext context) { @Override public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { -lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE); +clearState.set(true); --- End diff -- ok, no problem ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user JohannesDaniel commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145404160 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -138,10 +168,11 @@ protected void init(final ProcessorInitializationContext context) { descriptors.add(SOLR_TYPE); descriptors.add(SOLR_LOCATION); descriptors.add(COLLECTION); +descriptors.add(RETURN_TYPE); +descriptors.add(RECORD_WRITER); descriptors.add(SOLR_QUERY); -descriptors.add(RETURN_FIELDS); -descriptors.add(SORT_CLAUSE); --- End diff -- This should be save as the sorting only affects documents indexed after lastEndDate (documents indexed earlier are excluded by filter query) ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user JohannesDaniel commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145403786 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -66,42 +79,64 @@ import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.CursorMarkParams; -@Tags({"Apache", "Solr", "Get", "Pull"}) +@Tags({"Apache", "Solr", "Get", "Pull", "Records"}) @InputRequirement(Requirement.INPUT_FORBIDDEN) -@CapabilityDescription("Queries Solr and outputs the results as a FlowFile") +@CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer") +@Stateful(scopes = {Scope.LOCAL}, description = "Stores latest date of Date Field so that the same data will not be fetched multiple times.") --- End diff -- Sorry, this would be the correct filter query: fq=dateField:[lastEndDate TO NOW] ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user JohannesDaniel commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r145403415 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -66,42 +79,64 @@ import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.CursorMarkParams; -@Tags({"Apache", "Solr", "Get", "Pull"}) +@Tags({"Apache", "Solr", "Get", "Pull", "Records"}) @InputRequirement(Requirement.INPUT_FORBIDDEN) -@CapabilityDescription("Queries Solr and outputs the results as a FlowFile") +@CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer") +@Stateful(scopes = {Scope.LOCAL}, description = "Stores latest date of Date Field so that the same data will not be fetched multiple times.") --- End diff -- Do you really think that it is required to read the file? Backwards compatibility could also be realized by adding a filter query like fq=dateField:[* TO lastEndDate]. The user only had to specify the value of lastEndDate e. g. to an property of the processor. ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r144532208 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -66,42 +79,64 @@ import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.CursorMarkParams; -@Tags({"Apache", "Solr", "Get", "Pull"}) +@Tags({"Apache", "Solr", "Get", "Pull", "Records"}) @InputRequirement(Requirement.INPUT_FORBIDDEN) -@CapabilityDescription("Queries Solr and outputs the results as a FlowFile") +@CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer") +@Stateful(scopes = {Scope.LOCAL}, description = "Stores latest date of Date Field so that the same data will not be fetched multiple times.") --- End diff -- GetSolr used to use local file to store lastEndDate. We need migration code so that lastEndDate to be taken over to managed state when there's no state but the lastEndDate file exists. ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r144533090 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -66,42 +79,64 @@ import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.CursorMarkParams; -@Tags({"Apache", "Solr", "Get", "Pull"}) +@Tags({"Apache", "Solr", "Get", "Pull", "Records"}) @InputRequirement(Requirement.INPUT_FORBIDDEN) -@CapabilityDescription("Queries Solr and outputs the results as a FlowFile") +@CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer") +@Stateful(scopes = {Scope.LOCAL}, description = "Stores latest date of Date Field so that the same data will not be fetched multiple times.") --- End diff -- State scope should be CLUSTER, I think. Also, capability description should mention that this processor is designed to run on Primary Node only. Please refer ListHDFS processor documentation. Or does this processor work nicely in distributed fashion by utilizing multiple NiFi nodes against a Solr cluster? ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r144527126 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/testCollection/conf/schema.xml --- @@ -16,6 +16,16 @@ + + + + + + + + +id --- End diff -- What if Solr doc doesn't have an uniqueKey? Does this processor still work without uniqueKey?? ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r144526595 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java --- @@ -275,7 +275,7 @@ protected final boolean isBasicAuthEnabled() { } @Override -protected final Collection customValidate(ValidationContext context) { +protected Collection customValidate(ValidationContext context) { --- End diff -- Shouldn't we add another protected method to override at sub-classes? ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r144530989 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -138,10 +168,11 @@ protected void init(final ProcessorInitializationContext context) { descriptors.add(SOLR_TYPE); descriptors.add(SOLR_LOCATION); descriptors.add(COLLECTION); +descriptors.add(RETURN_TYPE); +descriptors.add(RECORD_WRITER); descriptors.add(SOLR_QUERY); -descriptors.add(RETURN_FIELDS); -descriptors.add(SORT_CLAUSE); --- End diff -- Is it safe to remove an existing property? The existing code should not sort result anyway, or should store last sorted field value to paginate properly when docs with the same date split more than one page. So I think it's safe.. ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r144530918 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -172,157 +203,196 @@ protected void init(final ProcessorInitializationContext context) { @Override public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { -lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE); +clearState.set(true); --- End diff -- Probably we'd like to clear state only when following properties get changed? It would be a bad UX if state is cleared when user re-configure batch size. - SOLR_TYPE - SOLR_LOCATION - COLLECTION - SOLR_QUERY - DATE_FIELD - RETURN_FIELDS ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2199#discussion_r144533800 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java --- @@ -172,157 +203,196 @@ protected void init(final ProcessorInitializationContext context) { @Override public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { -lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE); +clearState.set(true); } -@OnStopped -public void onStopped() { -writeLastEndDate(); -} +@OnScheduled +public void onScheduled2(final ProcessContext context) throws IOException { --- End diff -- Please change method name appropriately to represent what it does, such as `clearState`. The annotation explains when it's called. ---
[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor
GitHub user JohannesDaniel opened a pull request: https://github.com/apache/nifi/pull/2199 NIFI-3248: Improvement of GetSolr Processor Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/JohannesDaniel/nifi NIFI-3248 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2199.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2199 commit 8a5f7e54edc5640655edd19f15d22fada6ca9900 Author: JohannesDaniel Date: 2017-10-05T20:57:53Z NIFI-3248: Improvement of GetSolr Processor ---