[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/2199


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-23 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r146215638
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
@@ -170,159 +210,213 @@ protected void init(final 
ProcessorInitializationContext context) {
 return this.descriptors;
 }
 
+final static Set propertyNamesForActivatingClearState = new 
HashSet();
+static {
+propertyNamesForActivatingClearState.add(SOLR_TYPE.getName());
+propertyNamesForActivatingClearState.add(SOLR_LOCATION.getName());
+propertyNamesForActivatingClearState.add(COLLECTION.getName());
+propertyNamesForActivatingClearState.add(SOLR_QUERY.getName());
+propertyNamesForActivatingClearState.add(DATE_FIELD.getName());
+propertyNamesForActivatingClearState.add(RETURN_FIELDS.getName());
+}
+
 @Override
 public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
-lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE);
+if 
(propertyNamesForActivatingClearState.contains(descriptor.getName()))
+clearState.set(true);
 }
 
-@OnStopped
-public void onStopped() {
-writeLastEndDate();
-}
+@OnScheduled
+public void clearState(final ProcessContext context) throws 
IOException {
+if (clearState.getAndSet(false)) {
+context.getStateManager().clear(Scope.CLUSTER);
+final Map newStateMap = new 
HashMap();
 
-@OnRemoved
-public void onRemoved() {
-final File lastEndDateCache = new File(FILE_PREFIX + 
getIdentifier());
-if (lastEndDateCache.exists()) {
-lastEndDateCache.delete();
-}
-}
+newStateMap.put(STATE_MANAGER_CURSOR_MARK, "*");
 
-@Override
-public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-final ComponentLog logger = getLogger();
-readLastEndDate();
-
-final SimpleDateFormat sdf = new 
SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US);
-sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
-final String currDate = sdf.format(new Date());
-
-final boolean initialized = 
!UNINITIALIZED_LAST_END_DATE_VALUE.equals(lastEndDatedRef.get());
-
-final String query = context.getProperty(SOLR_QUERY).getValue();
-final SolrQuery solrQuery = new SolrQuery(query);
-solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger());
-
-// if initialized then apply a filter to restrict results from the 
last end time til now
-if (initialized) {
-StringBuilder filterQuery = new StringBuilder();
-filterQuery.append(context.getProperty(DATE_FIELD).getValue())
-.append(":{").append(lastEndDatedRef.get()).append(" 
TO ")
-.append(currDate).append("]");
-solrQuery.addFilterQuery(filterQuery.toString());
-logger.info("Applying filter query {}", new 
Object[]{filterQuery.toString()});
-}
+final String initialDate = 
context.getProperty(DATE_FILTER).getValue();
+if (StringUtils.isBlank(initialDate))
+newStateMap.put(STATE_MANAGER_FILTER, "*");
+else
+newStateMap.put(STATE_MANAGER_FILTER, initialDate);
 
-final String returnFields = 
context.getProperty(RETURN_FIELDS).getValue();
-if (returnFields != null && !returnFields.trim().isEmpty()) {
-for (String returnField : returnFields.trim().split("[,]")) {
-solrQuery.addField(returnField.trim());
-}
+context.getStateManager().setState(newStateMap, Scope.CLUSTER);
+
+id_field = null;
 }
+}
 
-final String fullSortClause = 
context.getProperty(SORT_CLAUSE).getValue();
-if (fullSortClause != null && !fullSortClause.trim().isEmpty()) {
-for (String sortClause : fullSortClause.split("[,]")) {
-String[] sortParts = sortClause.trim().split("[ ]");
-solrQuery.addSort(sortParts[0], 
SolrQuery.ORDER.valueOf(sortParts[1]));
-}
+@Override
+protected final Collection 
additionalCustomValidation(ValidationContext context) {
+final Collection problems = new ArrayList<>();
+
+if 
(context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue())
+&& !context.getProperty(RECOR

[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-23 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r146214902
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
@@ -66,42 +79,72 @@
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CursorMarkParams;
 
-@Tags({"Apache", "Solr", "Get", "Pull"})
+@Tags({"Apache", "Solr", "Get", "Pull", "Records"})
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
-@CapabilityDescription("Queries Solr and outputs the results as a 
FlowFile")
+@CapabilityDescription("Queries Solr and outputs the results as a FlowFile 
in the format of XML or using a Record Writer")
+@Stateful(scopes = {Scope.CLUSTER}, description = "Stores latest date of 
Date Field so that the same data will not be fetched multiple times.")
 public class GetSolr extends SolrProcessor {
 
-public static final PropertyDescriptor SOLR_QUERY = new 
PropertyDescriptor
-.Builder().name("Solr Query")
-.description("A query to execute against Solr")
+public static final String STATE_MANAGER_FILTER = 
"stateManager_filter";
+public static final String STATE_MANAGER_CURSOR_MARK = 
"stateManager_cursorMark";
+public static final AllowableValue MODE_XML = new 
AllowableValue("XML");
+public static final AllowableValue MODE_REC = new 
AllowableValue("Records");
--- End diff --

Thanks for your detailed considerations. I agree it's not easy task to do. 
To not lose your informative comments for future work, I've created another 
JIRA.  https://issues.apache.org/jira/browse/NIFI-4514


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-20 Thread JohannesDaniel
Github user JohannesDaniel commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r145946147
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
@@ -66,42 +79,72 @@
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CursorMarkParams;
 
-@Tags({"Apache", "Solr", "Get", "Pull"})
+@Tags({"Apache", "Solr", "Get", "Pull", "Records"})
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
-@CapabilityDescription("Queries Solr and outputs the results as a 
FlowFile")
+@CapabilityDescription("Queries Solr and outputs the results as a FlowFile 
in the format of XML or using a Record Writer")
+@Stateful(scopes = {Scope.CLUSTER}, description = "Stores latest date of 
Date Field so that the same data will not be fetched multiple times.")
 public class GetSolr extends SolrProcessor {
 
-public static final PropertyDescriptor SOLR_QUERY = new 
PropertyDescriptor
-.Builder().name("Solr Query")
-.description("A query to execute against Solr")
+public static final String STATE_MANAGER_FILTER = 
"stateManager_filter";
+public static final String STATE_MANAGER_CURSOR_MARK = 
"stateManager_cursorMark";
+public static final AllowableValue MODE_XML = new 
AllowableValue("XML");
+public static final AllowableValue MODE_REC = new 
AllowableValue("Records");
--- End diff --

Hmm, and dynamic fields could become a problem... I think this is not 
possible.


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-19 Thread JohannesDaniel
Github user JohannesDaniel commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r145727845
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
@@ -170,159 +210,213 @@ protected void init(final 
ProcessorInitializationContext context) {
 return this.descriptors;
 }
 
+final static Set propertyNamesForActivatingClearState = new 
HashSet();
+static {
+propertyNamesForActivatingClearState.add(SOLR_TYPE.getName());
+propertyNamesForActivatingClearState.add(SOLR_LOCATION.getName());
+propertyNamesForActivatingClearState.add(COLLECTION.getName());
+propertyNamesForActivatingClearState.add(SOLR_QUERY.getName());
+propertyNamesForActivatingClearState.add(DATE_FIELD.getName());
+propertyNamesForActivatingClearState.add(RETURN_FIELDS.getName());
+}
+
 @Override
 public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
-lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE);
+if 
(propertyNamesForActivatingClearState.contains(descriptor.getName()))
+clearState.set(true);
 }
 
-@OnStopped
-public void onStopped() {
-writeLastEndDate();
-}
+@OnScheduled
+public void clearState(final ProcessContext context) throws 
IOException {
+if (clearState.getAndSet(false)) {
+context.getStateManager().clear(Scope.CLUSTER);
+final Map newStateMap = new 
HashMap();
 
-@OnRemoved
-public void onRemoved() {
-final File lastEndDateCache = new File(FILE_PREFIX + 
getIdentifier());
-if (lastEndDateCache.exists()) {
-lastEndDateCache.delete();
-}
-}
+newStateMap.put(STATE_MANAGER_CURSOR_MARK, "*");
 
-@Override
-public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-final ComponentLog logger = getLogger();
-readLastEndDate();
-
-final SimpleDateFormat sdf = new 
SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US);
-sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
-final String currDate = sdf.format(new Date());
-
-final boolean initialized = 
!UNINITIALIZED_LAST_END_DATE_VALUE.equals(lastEndDatedRef.get());
-
-final String query = context.getProperty(SOLR_QUERY).getValue();
-final SolrQuery solrQuery = new SolrQuery(query);
-solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger());
-
-// if initialized then apply a filter to restrict results from the 
last end time til now
-if (initialized) {
-StringBuilder filterQuery = new StringBuilder();
-filterQuery.append(context.getProperty(DATE_FIELD).getValue())
-.append(":{").append(lastEndDatedRef.get()).append(" 
TO ")
-.append(currDate).append("]");
-solrQuery.addFilterQuery(filterQuery.toString());
-logger.info("Applying filter query {}", new 
Object[]{filterQuery.toString()});
-}
+final String initialDate = 
context.getProperty(DATE_FILTER).getValue();
+if (StringUtils.isBlank(initialDate))
+newStateMap.put(STATE_MANAGER_FILTER, "*");
+else
+newStateMap.put(STATE_MANAGER_FILTER, initialDate);
 
-final String returnFields = 
context.getProperty(RETURN_FIELDS).getValue();
-if (returnFields != null && !returnFields.trim().isEmpty()) {
-for (String returnField : returnFields.trim().split("[,]")) {
-solrQuery.addField(returnField.trim());
-}
+context.getStateManager().setState(newStateMap, Scope.CLUSTER);
+
+id_field = null;
 }
+}
 
-final String fullSortClause = 
context.getProperty(SORT_CLAUSE).getValue();
-if (fullSortClause != null && !fullSortClause.trim().isEmpty()) {
-for (String sortClause : fullSortClause.split("[,]")) {
-String[] sortParts = sortClause.trim().split("[ ]");
-solrQuery.addSort(sortParts[0], 
SolrQuery.ORDER.valueOf(sortParts[1]));
-}
+@Override
+protected final Collection 
additionalCustomValidation(ValidationContext context) {
+final Collection problems = new ArrayList<>();
+
+if 
(context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue())
+&& !context.getProperty(REC

[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-19 Thread JohannesDaniel
Github user JohannesDaniel commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r145721674
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
@@ -66,42 +79,72 @@
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CursorMarkParams;
 
-@Tags({"Apache", "Solr", "Get", "Pull"})
+@Tags({"Apache", "Solr", "Get", "Pull", "Records"})
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
-@CapabilityDescription("Queries Solr and outputs the results as a 
FlowFile")
+@CapabilityDescription("Queries Solr and outputs the results as a FlowFile 
in the format of XML or using a Record Writer")
+@Stateful(scopes = {Scope.CLUSTER}, description = "Stores latest date of 
Date Field so that the same data will not be fetched multiple times.")
 public class GetSolr extends SolrProcessor {
 
-public static final PropertyDescriptor SOLR_QUERY = new 
PropertyDescriptor
-.Builder().name("Solr Query")
-.description("A query to execute against Solr")
+public static final String STATE_MANAGER_FILTER = 
"stateManager_filter";
+public static final String STATE_MANAGER_CURSOR_MARK = 
"stateManager_cursorMark";
+public static final AllowableValue MODE_XML = new 
AllowableValue("XML");
+public static final AllowableValue MODE_REC = new 
AllowableValue("Records");
+
+public static final PropertyDescriptor RETURN_TYPE = new 
PropertyDescriptor
+.Builder().name("Return Type")
+.displayName("Return Type")
--- End diff --

The most properties were already available in the prior GetSol processor. I 
expected this to be critical for backwards compatibility. For the new 
properties I chose the same naming pattern.


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-19 Thread JohannesDaniel
Github user JohannesDaniel commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r145720938
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
@@ -66,42 +79,72 @@
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CursorMarkParams;
 
-@Tags({"Apache", "Solr", "Get", "Pull"})
+@Tags({"Apache", "Solr", "Get", "Pull", "Records"})
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
-@CapabilityDescription("Queries Solr and outputs the results as a 
FlowFile")
+@CapabilityDescription("Queries Solr and outputs the results as a FlowFile 
in the format of XML or using a Record Writer")
+@Stateful(scopes = {Scope.CLUSTER}, description = "Stores latest date of 
Date Field so that the same data will not be fetched multiple times.")
 public class GetSolr extends SolrProcessor {
 
-public static final PropertyDescriptor SOLR_QUERY = new 
PropertyDescriptor
-.Builder().name("Solr Query")
-.description("A query to execute against Solr")
+public static final String STATE_MANAGER_FILTER = 
"stateManager_filter";
+public static final String STATE_MANAGER_CURSOR_MARK = 
"stateManager_cursorMark";
+public static final AllowableValue MODE_XML = new 
AllowableValue("XML");
+public static final AllowableValue MODE_REC = new 
AllowableValue("Records");
--- End diff --

Additionally, this requires parsing of response json, as the response 
parsing of Schema API is not really realized in SolrJ


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-19 Thread JohannesDaniel
Github user JohannesDaniel commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r145719121
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
@@ -66,42 +79,72 @@
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CursorMarkParams;
 
-@Tags({"Apache", "Solr", "Get", "Pull"})
+@Tags({"Apache", "Solr", "Get", "Pull", "Records"})
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
-@CapabilityDescription("Queries Solr and outputs the results as a 
FlowFile")
+@CapabilityDescription("Queries Solr and outputs the results as a FlowFile 
in the format of XML or using a Record Writer")
+@Stateful(scopes = {Scope.CLUSTER}, description = "Stores latest date of 
Date Field so that the same data will not be fetched multiple times.")
 public class GetSolr extends SolrProcessor {
 
-public static final PropertyDescriptor SOLR_QUERY = new 
PropertyDescriptor
-.Builder().name("Solr Query")
-.description("A query to execute against Solr")
+public static final String STATE_MANAGER_FILTER = 
"stateManager_filter";
+public static final String STATE_MANAGER_CURSOR_MARK = 
"stateManager_cursorMark";
+public static final AllowableValue MODE_XML = new 
AllowableValue("XML");
+public static final AllowableValue MODE_REC = new 
AllowableValue("Records");
--- End diff --

The difficulty with this is that Solr provides various different field 
types for different kinds of data. For instance, an integer could be derived 
from an Int, TrieInt (version < 7.0) or Pint (version >= 7.0) field. This 
requires a comprehensive fieldtype-datatype mapping.


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-19 Thread JohannesDaniel
Github user JohannesDaniel commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r145712892
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
@@ -66,42 +79,72 @@
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CursorMarkParams;
 
-@Tags({"Apache", "Solr", "Get", "Pull"})
+@Tags({"Apache", "Solr", "Get", "Pull", "Records"})
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
-@CapabilityDescription("Queries Solr and outputs the results as a 
FlowFile")
+@CapabilityDescription("Queries Solr and outputs the results as a FlowFile 
in the format of XML or using a Record Writer")
+@Stateful(scopes = {Scope.CLUSTER}, description = "Stores latest date of 
Date Field so that the same data will not be fetched multiple times.")
 public class GetSolr extends SolrProcessor {
 
-public static final PropertyDescriptor SOLR_QUERY = new 
PropertyDescriptor
-.Builder().name("Solr Query")
-.description("A query to execute against Solr")
+public static final String STATE_MANAGER_FILTER = 
"stateManager_filter";
+public static final String STATE_MANAGER_CURSOR_MARK = 
"stateManager_cursorMark";
+public static final AllowableValue MODE_XML = new 
AllowableValue("XML");
+public static final AllowableValue MODE_REC = new 
AllowableValue("Records");
--- End diff --

Principally yes, by using the Schema API. But I dont expect this to be too 
easy. I suggest that we create a separate ticket for this as it should require 
some deeper considerations.


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-19 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r145688724
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
@@ -66,42 +79,72 @@
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CursorMarkParams;
 
-@Tags({"Apache", "Solr", "Get", "Pull"})
+@Tags({"Apache", "Solr", "Get", "Pull", "Records"})
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
-@CapabilityDescription("Queries Solr and outputs the results as a 
FlowFile")
+@CapabilityDescription("Queries Solr and outputs the results as a FlowFile 
in the format of XML or using a Record Writer")
+@Stateful(scopes = {Scope.CLUSTER}, description = "Stores latest date of 
Date Field so that the same data will not be fetched multiple times.")
 public class GetSolr extends SolrProcessor {
 
-public static final PropertyDescriptor SOLR_QUERY = new 
PropertyDescriptor
-.Builder().name("Solr Query")
-.description("A query to execute against Solr")
+public static final String STATE_MANAGER_FILTER = 
"stateManager_filter";
+public static final String STATE_MANAGER_CURSOR_MARK = 
"stateManager_cursorMark";
+public static final AllowableValue MODE_XML = new 
AllowableValue("XML");
+public static final AllowableValue MODE_REC = new 
AllowableValue("Records");
+
+public static final PropertyDescriptor RETURN_TYPE = new 
PropertyDescriptor
+.Builder().name("Return Type")
+.displayName("Return Type")
+.description("Write Solr documents to FlowFiles as XML or 
using a Record Writer")
 .required(true)
-.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.allowableValues(MODE_XML, MODE_REC)
+.defaultValue(MODE_REC.getValue())
--- End diff --

The default value should be MODE_XML as it did before.


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-19 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r145696081
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
@@ -170,159 +210,213 @@ protected void init(final 
ProcessorInitializationContext context) {
 return this.descriptors;
 }
 
+final static Set propertyNamesForActivatingClearState = new 
HashSet();
+static {
+propertyNamesForActivatingClearState.add(SOLR_TYPE.getName());
+propertyNamesForActivatingClearState.add(SOLR_LOCATION.getName());
+propertyNamesForActivatingClearState.add(COLLECTION.getName());
+propertyNamesForActivatingClearState.add(SOLR_QUERY.getName());
+propertyNamesForActivatingClearState.add(DATE_FIELD.getName());
+propertyNamesForActivatingClearState.add(RETURN_FIELDS.getName());
+}
+
 @Override
 public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
-lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE);
+if 
(propertyNamesForActivatingClearState.contains(descriptor.getName()))
+clearState.set(true);
 }
 
-@OnStopped
-public void onStopped() {
-writeLastEndDate();
-}
+@OnScheduled
+public void clearState(final ProcessContext context) throws 
IOException {
+if (clearState.getAndSet(false)) {
+context.getStateManager().clear(Scope.CLUSTER);
+final Map newStateMap = new 
HashMap();
 
-@OnRemoved
-public void onRemoved() {
-final File lastEndDateCache = new File(FILE_PREFIX + 
getIdentifier());
-if (lastEndDateCache.exists()) {
-lastEndDateCache.delete();
-}
-}
+newStateMap.put(STATE_MANAGER_CURSOR_MARK, "*");
 
-@Override
-public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-final ComponentLog logger = getLogger();
-readLastEndDate();
-
-final SimpleDateFormat sdf = new 
SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US);
-sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
-final String currDate = sdf.format(new Date());
-
-final boolean initialized = 
!UNINITIALIZED_LAST_END_DATE_VALUE.equals(lastEndDatedRef.get());
-
-final String query = context.getProperty(SOLR_QUERY).getValue();
-final SolrQuery solrQuery = new SolrQuery(query);
-solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger());
-
-// if initialized then apply a filter to restrict results from the 
last end time til now
-if (initialized) {
-StringBuilder filterQuery = new StringBuilder();
-filterQuery.append(context.getProperty(DATE_FIELD).getValue())
-.append(":{").append(lastEndDatedRef.get()).append(" 
TO ")
-.append(currDate).append("]");
-solrQuery.addFilterQuery(filterQuery.toString());
-logger.info("Applying filter query {}", new 
Object[]{filterQuery.toString()});
-}
+final String initialDate = 
context.getProperty(DATE_FILTER).getValue();
+if (StringUtils.isBlank(initialDate))
+newStateMap.put(STATE_MANAGER_FILTER, "*");
+else
+newStateMap.put(STATE_MANAGER_FILTER, initialDate);
 
-final String returnFields = 
context.getProperty(RETURN_FIELDS).getValue();
-if (returnFields != null && !returnFields.trim().isEmpty()) {
-for (String returnField : returnFields.trim().split("[,]")) {
-solrQuery.addField(returnField.trim());
-}
+context.getStateManager().setState(newStateMap, Scope.CLUSTER);
+
+id_field = null;
 }
+}
 
-final String fullSortClause = 
context.getProperty(SORT_CLAUSE).getValue();
-if (fullSortClause != null && !fullSortClause.trim().isEmpty()) {
-for (String sortClause : fullSortClause.split("[,]")) {
-String[] sortParts = sortClause.trim().split("[ ]");
-solrQuery.addSort(sortParts[0], 
SolrQuery.ORDER.valueOf(sortParts[1]));
-}
+@Override
+protected final Collection 
additionalCustomValidation(ValidationContext context) {
+final Collection problems = new ArrayList<>();
+
+if 
(context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue())
+&& !context.getProperty(RECOR

[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-19 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r145698373
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
@@ -170,159 +210,213 @@ protected void init(final 
ProcessorInitializationContext context) {
 return this.descriptors;
 }
 
+final static Set propertyNamesForActivatingClearState = new 
HashSet();
+static {
+propertyNamesForActivatingClearState.add(SOLR_TYPE.getName());
+propertyNamesForActivatingClearState.add(SOLR_LOCATION.getName());
+propertyNamesForActivatingClearState.add(COLLECTION.getName());
+propertyNamesForActivatingClearState.add(SOLR_QUERY.getName());
+propertyNamesForActivatingClearState.add(DATE_FIELD.getName());
+propertyNamesForActivatingClearState.add(RETURN_FIELDS.getName());
+}
+
 @Override
 public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
-lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE);
+if 
(propertyNamesForActivatingClearState.contains(descriptor.getName()))
+clearState.set(true);
 }
 
-@OnStopped
-public void onStopped() {
-writeLastEndDate();
-}
+@OnScheduled
+public void clearState(final ProcessContext context) throws 
IOException {
+if (clearState.getAndSet(false)) {
+context.getStateManager().clear(Scope.CLUSTER);
+final Map newStateMap = new 
HashMap();
 
-@OnRemoved
-public void onRemoved() {
-final File lastEndDateCache = new File(FILE_PREFIX + 
getIdentifier());
-if (lastEndDateCache.exists()) {
-lastEndDateCache.delete();
-}
-}
+newStateMap.put(STATE_MANAGER_CURSOR_MARK, "*");
 
-@Override
-public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-final ComponentLog logger = getLogger();
-readLastEndDate();
-
-final SimpleDateFormat sdf = new 
SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US);
-sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
-final String currDate = sdf.format(new Date());
-
-final boolean initialized = 
!UNINITIALIZED_LAST_END_DATE_VALUE.equals(lastEndDatedRef.get());
-
-final String query = context.getProperty(SOLR_QUERY).getValue();
-final SolrQuery solrQuery = new SolrQuery(query);
-solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger());
-
-// if initialized then apply a filter to restrict results from the 
last end time til now
-if (initialized) {
-StringBuilder filterQuery = new StringBuilder();
-filterQuery.append(context.getProperty(DATE_FIELD).getValue())
-.append(":{").append(lastEndDatedRef.get()).append(" 
TO ")
-.append(currDate).append("]");
-solrQuery.addFilterQuery(filterQuery.toString());
-logger.info("Applying filter query {}", new 
Object[]{filterQuery.toString()});
-}
+final String initialDate = 
context.getProperty(DATE_FILTER).getValue();
+if (StringUtils.isBlank(initialDate))
+newStateMap.put(STATE_MANAGER_FILTER, "*");
+else
+newStateMap.put(STATE_MANAGER_FILTER, initialDate);
 
-final String returnFields = 
context.getProperty(RETURN_FIELDS).getValue();
-if (returnFields != null && !returnFields.trim().isEmpty()) {
-for (String returnField : returnFields.trim().split("[,]")) {
-solrQuery.addField(returnField.trim());
-}
+context.getStateManager().setState(newStateMap, Scope.CLUSTER);
+
+id_field = null;
 }
+}
 
-final String fullSortClause = 
context.getProperty(SORT_CLAUSE).getValue();
-if (fullSortClause != null && !fullSortClause.trim().isEmpty()) {
-for (String sortClause : fullSortClause.split("[,]")) {
-String[] sortParts = sortClause.trim().split("[ ]");
-solrQuery.addSort(sortParts[0], 
SolrQuery.ORDER.valueOf(sortParts[1]));
-}
+@Override
+protected final Collection 
additionalCustomValidation(ValidationContext context) {
+final Collection problems = new ArrayList<>();
+
+if 
(context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue())
+&& !context.getProperty(RECOR

[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-19 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r145696508
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
@@ -170,159 +210,213 @@ protected void init(final 
ProcessorInitializationContext context) {
 return this.descriptors;
 }
 
+final static Set propertyNamesForActivatingClearState = new 
HashSet();
+static {
+propertyNamesForActivatingClearState.add(SOLR_TYPE.getName());
+propertyNamesForActivatingClearState.add(SOLR_LOCATION.getName());
+propertyNamesForActivatingClearState.add(COLLECTION.getName());
+propertyNamesForActivatingClearState.add(SOLR_QUERY.getName());
+propertyNamesForActivatingClearState.add(DATE_FIELD.getName());
+propertyNamesForActivatingClearState.add(RETURN_FIELDS.getName());
+}
+
 @Override
 public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
-lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE);
+if 
(propertyNamesForActivatingClearState.contains(descriptor.getName()))
+clearState.set(true);
 }
 
-@OnStopped
-public void onStopped() {
-writeLastEndDate();
-}
+@OnScheduled
+public void clearState(final ProcessContext context) throws 
IOException {
+if (clearState.getAndSet(false)) {
+context.getStateManager().clear(Scope.CLUSTER);
+final Map newStateMap = new 
HashMap();
 
-@OnRemoved
-public void onRemoved() {
-final File lastEndDateCache = new File(FILE_PREFIX + 
getIdentifier());
-if (lastEndDateCache.exists()) {
-lastEndDateCache.delete();
-}
-}
+newStateMap.put(STATE_MANAGER_CURSOR_MARK, "*");
 
-@Override
-public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-final ComponentLog logger = getLogger();
-readLastEndDate();
-
-final SimpleDateFormat sdf = new 
SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US);
-sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
-final String currDate = sdf.format(new Date());
-
-final boolean initialized = 
!UNINITIALIZED_LAST_END_DATE_VALUE.equals(lastEndDatedRef.get());
-
-final String query = context.getProperty(SOLR_QUERY).getValue();
-final SolrQuery solrQuery = new SolrQuery(query);
-solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger());
-
-// if initialized then apply a filter to restrict results from the 
last end time til now
-if (initialized) {
-StringBuilder filterQuery = new StringBuilder();
-filterQuery.append(context.getProperty(DATE_FIELD).getValue())
-.append(":{").append(lastEndDatedRef.get()).append(" 
TO ")
-.append(currDate).append("]");
-solrQuery.addFilterQuery(filterQuery.toString());
-logger.info("Applying filter query {}", new 
Object[]{filterQuery.toString()});
-}
+final String initialDate = 
context.getProperty(DATE_FILTER).getValue();
+if (StringUtils.isBlank(initialDate))
+newStateMap.put(STATE_MANAGER_FILTER, "*");
+else
+newStateMap.put(STATE_MANAGER_FILTER, initialDate);
 
-final String returnFields = 
context.getProperty(RETURN_FIELDS).getValue();
-if (returnFields != null && !returnFields.trim().isEmpty()) {
-for (String returnField : returnFields.trim().split("[,]")) {
-solrQuery.addField(returnField.trim());
-}
+context.getStateManager().setState(newStateMap, Scope.CLUSTER);
+
+id_field = null;
 }
+}
 
-final String fullSortClause = 
context.getProperty(SORT_CLAUSE).getValue();
-if (fullSortClause != null && !fullSortClause.trim().isEmpty()) {
-for (String sortClause : fullSortClause.split("[,]")) {
-String[] sortParts = sortClause.trim().split("[ ]");
-solrQuery.addSort(sortParts[0], 
SolrQuery.ORDER.valueOf(sortParts[1]));
-}
+@Override
+protected final Collection 
additionalCustomValidation(ValidationContext context) {
+final Collection problems = new ArrayList<>();
+
+if 
(context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue())
+&& !context.getProperty(RECOR

[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-19 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r145697961
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
@@ -170,159 +210,213 @@ protected void init(final 
ProcessorInitializationContext context) {
 return this.descriptors;
 }
 
+final static Set propertyNamesForActivatingClearState = new 
HashSet();
+static {
+propertyNamesForActivatingClearState.add(SOLR_TYPE.getName());
+propertyNamesForActivatingClearState.add(SOLR_LOCATION.getName());
+propertyNamesForActivatingClearState.add(COLLECTION.getName());
+propertyNamesForActivatingClearState.add(SOLR_QUERY.getName());
+propertyNamesForActivatingClearState.add(DATE_FIELD.getName());
+propertyNamesForActivatingClearState.add(RETURN_FIELDS.getName());
+}
+
 @Override
 public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
-lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE);
+if 
(propertyNamesForActivatingClearState.contains(descriptor.getName()))
+clearState.set(true);
 }
 
-@OnStopped
-public void onStopped() {
-writeLastEndDate();
-}
+@OnScheduled
+public void clearState(final ProcessContext context) throws 
IOException {
+if (clearState.getAndSet(false)) {
+context.getStateManager().clear(Scope.CLUSTER);
+final Map newStateMap = new 
HashMap();
 
-@OnRemoved
-public void onRemoved() {
-final File lastEndDateCache = new File(FILE_PREFIX + 
getIdentifier());
-if (lastEndDateCache.exists()) {
-lastEndDateCache.delete();
-}
-}
+newStateMap.put(STATE_MANAGER_CURSOR_MARK, "*");
 
-@Override
-public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-final ComponentLog logger = getLogger();
-readLastEndDate();
-
-final SimpleDateFormat sdf = new 
SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US);
-sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
-final String currDate = sdf.format(new Date());
-
-final boolean initialized = 
!UNINITIALIZED_LAST_END_DATE_VALUE.equals(lastEndDatedRef.get());
-
-final String query = context.getProperty(SOLR_QUERY).getValue();
-final SolrQuery solrQuery = new SolrQuery(query);
-solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger());
-
-// if initialized then apply a filter to restrict results from the 
last end time til now
-if (initialized) {
-StringBuilder filterQuery = new StringBuilder();
-filterQuery.append(context.getProperty(DATE_FIELD).getValue())
-.append(":{").append(lastEndDatedRef.get()).append(" 
TO ")
-.append(currDate).append("]");
-solrQuery.addFilterQuery(filterQuery.toString());
-logger.info("Applying filter query {}", new 
Object[]{filterQuery.toString()});
-}
+final String initialDate = 
context.getProperty(DATE_FILTER).getValue();
+if (StringUtils.isBlank(initialDate))
+newStateMap.put(STATE_MANAGER_FILTER, "*");
+else
+newStateMap.put(STATE_MANAGER_FILTER, initialDate);
 
-final String returnFields = 
context.getProperty(RETURN_FIELDS).getValue();
-if (returnFields != null && !returnFields.trim().isEmpty()) {
-for (String returnField : returnFields.trim().split("[,]")) {
-solrQuery.addField(returnField.trim());
-}
+context.getStateManager().setState(newStateMap, Scope.CLUSTER);
+
+id_field = null;
 }
+}
 
-final String fullSortClause = 
context.getProperty(SORT_CLAUSE).getValue();
-if (fullSortClause != null && !fullSortClause.trim().isEmpty()) {
-for (String sortClause : fullSortClause.split("[,]")) {
-String[] sortParts = sortClause.trim().split("[ ]");
-solrQuery.addSort(sortParts[0], 
SolrQuery.ORDER.valueOf(sortParts[1]));
-}
+@Override
+protected final Collection 
additionalCustomValidation(ValidationContext context) {
+final Collection problems = new ArrayList<>();
+
+if 
(context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue())
+&& !context.getProperty(RECOR

[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-19 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r145690789
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
@@ -66,42 +79,72 @@
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CursorMarkParams;
 
-@Tags({"Apache", "Solr", "Get", "Pull"})
+@Tags({"Apache", "Solr", "Get", "Pull", "Records"})
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
-@CapabilityDescription("Queries Solr and outputs the results as a 
FlowFile")
+@CapabilityDescription("Queries Solr and outputs the results as a FlowFile 
in the format of XML or using a Record Writer")
+@Stateful(scopes = {Scope.CLUSTER}, description = "Stores latest date of 
Date Field so that the same data will not be fetched multiple times.")
 public class GetSolr extends SolrProcessor {
 
-public static final PropertyDescriptor SOLR_QUERY = new 
PropertyDescriptor
-.Builder().name("Solr Query")
-.description("A query to execute against Solr")
+public static final String STATE_MANAGER_FILTER = 
"stateManager_filter";
+public static final String STATE_MANAGER_CURSOR_MARK = 
"stateManager_cursorMark";
+public static final AllowableValue MODE_XML = new 
AllowableValue("XML");
+public static final AllowableValue MODE_REC = new 
AllowableValue("Records");
+
+public static final PropertyDescriptor RETURN_TYPE = new 
PropertyDescriptor
+.Builder().name("Return Type")
+.displayName("Return Type")
--- End diff --

Although I haven't seen a specific guideline or documentation, other 
processors prefer having `name` in lower case looks like a key of property or 
configuration name such as `return_type` so that user can type the name without 
worrying about spacing or case sensitivity, while `displayName` is a more 
verbose human readable name.

`name` would be more important in the world of MiNiFi or other application 
directly talks with NiFi API programatically.

I don't have strong opinion here but just wanted to share what those two 
are.


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-19 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r145699419
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
@@ -66,42 +79,72 @@
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CursorMarkParams;
 
-@Tags({"Apache", "Solr", "Get", "Pull"})
+@Tags({"Apache", "Solr", "Get", "Pull", "Records"})
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
-@CapabilityDescription("Queries Solr and outputs the results as a 
FlowFile")
+@CapabilityDescription("Queries Solr and outputs the results as a FlowFile 
in the format of XML or using a Record Writer")
+@Stateful(scopes = {Scope.CLUSTER}, description = "Stores latest date of 
Date Field so that the same data will not be fetched multiple times.")
 public class GetSolr extends SolrProcessor {
 
-public static final PropertyDescriptor SOLR_QUERY = new 
PropertyDescriptor
-.Builder().name("Solr Query")
-.description("A query to execute against Solr")
+public static final String STATE_MANAGER_FILTER = 
"stateManager_filter";
+public static final String STATE_MANAGER_CURSOR_MARK = 
"stateManager_cursorMark";
+public static final AllowableValue MODE_XML = new 
AllowableValue("XML");
+public static final AllowableValue MODE_REC = new 
AllowableValue("Records");
--- End diff --

Just an idea. Configuring a schema for the writer manually can be 
cumbersome. I wonder if it's possible to load a schema from the target 
collection then auto generate NiFi record schema from it. Do you think it's 
doable?


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-19 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r145678316
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
@@ -126,6 +126,14 @@
 .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
 .build();
 
+public static final PropertyDescriptor DATE_FILTER = new 
PropertyDescriptor
+.Builder().name("Initial Date Filter")
+.displayName("Initial Date Filter")
+.description("Date value to filter results. Documents with an 
earlier date will not be fetched. The format has to correspond to the date 
pattern of Solr '-MM-DDThh:mm:ssZ'")
+.required(false)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
--- End diff --

You can change description of processor at `@CapabilityDescription` 
annotation. 
https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java#L72

If more detailed documentation is needed, processor can have 
`additionalDetails.html` like this.

https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org.apache.nifi.processors.solr.PutSolrContentStream/additionalDetails.html


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-18 Thread JohannesDaniel
Github user JohannesDaniel commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r145612359
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
@@ -126,6 +126,14 @@
 .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
 .build();
 
+public static final PropertyDescriptor DATE_FILTER = new 
PropertyDescriptor
+.Builder().name("Initial Date Filter")
+.displayName("Initial Date Filter")
+.description("Date value to filter results. Documents with an 
earlier date will not be fetched. The format has to correspond to the date 
pattern of Solr '-MM-DDThh:mm:ssZ'")
+.required(false)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
--- End diff --

This property should make it quite obvious, how backwards compatibility can 
be achieved. Additionally, I will describe it in the documentation. BTW: Where 
can I change descriptions of processor usage? Did not find them in folder 
nifi-docs...


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-18 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r145416772
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
 ---
@@ -275,7 +275,7 @@ protected final boolean isBasicAuthEnabled() {
 }
 
 @Override
-protected final Collection 
customValidate(ValidationContext context) {
+protected Collection 
customValidate(ValidationContext context) {
--- End diff --

Good call. Then the method can be a non-abstract method at SolrProcessor 
that does nothing.


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-18 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r145416024
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/testCollection/conf/schema.xml
 ---
@@ -16,6 +16,16 @@
 
 
 
+
 
 
+
+
+
+
+
+
+
+id
--- End diff --

I agree with that, most indices have unique key. But just asked it because 
it is not mandatory to have an unique key according to [Solr 
documentation](https://wiki.apache.org/solr/SchemaXml#The_Unique_Key_Field). 
Then I prefer to state that unique key is required for this processor to work 
properly in NiFi documentation.


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-18 Thread JohannesDaniel
Github user JohannesDaniel commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r145415068
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
 ---
@@ -275,7 +275,7 @@ protected final boolean isBasicAuthEnabled() {
 }
 
 @Override
-protected final Collection 
customValidate(ValidationContext context) {
+protected Collection 
customValidate(ValidationContext context) {
--- End diff --

ok. by doing so, i will also have to add this method to PutSolrContentStream


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-18 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r145412969
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
 ---
@@ -275,7 +275,7 @@ protected final boolean isBasicAuthEnabled() {
 }
 
 @Override
-protected final Collection 
customValidate(ValidationContext context) {
+protected Collection 
customValidate(ValidationContext context) {
--- End diff --

I imagine the reason why this customValidate is marked with `final` is that 
because the original author wanted to avoid sub-classes skip executing 
validation code implemented here. You implemented within GetSolr, and call 
`super.customValidate` from there, so it should be fine, but other sub-class 
can forget to call `super.customValidate` if we remove `final` keyword.

So, I thought it might be safer approach to add an abstract method, such as 
`additionalCustomValidate` at SolrProcessor, then call it from customValidate, 
and let sub-classes implement custom validation in it.


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-18 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r145410576
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
@@ -66,42 +79,64 @@
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CursorMarkParams;
 
-@Tags({"Apache", "Solr", "Get", "Pull"})
+@Tags({"Apache", "Solr", "Get", "Pull", "Records"})
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
-@CapabilityDescription("Queries Solr and outputs the results as a 
FlowFile")
+@CapabilityDescription("Queries Solr and outputs the results as a FlowFile 
in the format of XML or using a Record Writer")
+@Stateful(scopes = {Scope.LOCAL}, description = "Stores latest date of 
Date Field so that the same data will not be fetched multiple times.")
--- End diff --

The exact way to realize backward compatibility is up to you :) I'm fine as 
long as user can understand how to migrate existing state to new version of 
this processor. If it needs to be done manually, then it should be documented.


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-18 Thread JohannesDaniel
Github user JohannesDaniel commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r145408461
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/testCollection/conf/schema.xml
 ---
@@ -16,6 +16,16 @@
 
 
 
+
 
 
+
+
+
+
+
+
+
+id
--- End diff --

the uniqueKey field has to be part of the sorting. Well-configured Solr 
indexes always include this kind of field as many things will not work properly 
without this field. Actually, I have never seen a Solr index without this (and 
I have seen a lot ... ;). 


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-18 Thread JohannesDaniel
Github user JohannesDaniel commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r145405902
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
 ---
@@ -275,7 +275,7 @@ protected final boolean isBasicAuthEnabled() {
 }
 
 @Override
-protected final Collection 
customValidate(ValidationContext context) {
+protected Collection 
customValidate(ValidationContext context) {
--- End diff --

I did within class GetSolr


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-18 Thread JohannesDaniel
Github user JohannesDaniel commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r145404225
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
@@ -172,157 +203,196 @@ protected void init(final 
ProcessorInitializationContext context) {
 
 @Override
 public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
-lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE);
+clearState.set(true);
--- End diff --

ok, no problem


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-18 Thread JohannesDaniel
Github user JohannesDaniel commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r145404160
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
@@ -138,10 +168,11 @@ protected void init(final 
ProcessorInitializationContext context) {
 descriptors.add(SOLR_TYPE);
 descriptors.add(SOLR_LOCATION);
 descriptors.add(COLLECTION);
+descriptors.add(RETURN_TYPE);
+descriptors.add(RECORD_WRITER);
 descriptors.add(SOLR_QUERY);
-descriptors.add(RETURN_FIELDS);
-descriptors.add(SORT_CLAUSE);
--- End diff --

This should be save as the sorting only affects documents indexed after 
lastEndDate (documents indexed earlier are excluded by filter query)


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-18 Thread JohannesDaniel
Github user JohannesDaniel commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r145403786
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
@@ -66,42 +79,64 @@
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CursorMarkParams;
 
-@Tags({"Apache", "Solr", "Get", "Pull"})
+@Tags({"Apache", "Solr", "Get", "Pull", "Records"})
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
-@CapabilityDescription("Queries Solr and outputs the results as a 
FlowFile")
+@CapabilityDescription("Queries Solr and outputs the results as a FlowFile 
in the format of XML or using a Record Writer")
+@Stateful(scopes = {Scope.LOCAL}, description = "Stores latest date of 
Date Field so that the same data will not be fetched multiple times.")
--- End diff --

Sorry, this would be the correct filter query:
 fq=dateField:[lastEndDate TO NOW]


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-18 Thread JohannesDaniel
Github user JohannesDaniel commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r145403415
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
@@ -66,42 +79,64 @@
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CursorMarkParams;
 
-@Tags({"Apache", "Solr", "Get", "Pull"})
+@Tags({"Apache", "Solr", "Get", "Pull", "Records"})
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
-@CapabilityDescription("Queries Solr and outputs the results as a 
FlowFile")
+@CapabilityDescription("Queries Solr and outputs the results as a FlowFile 
in the format of XML or using a Record Writer")
+@Stateful(scopes = {Scope.LOCAL}, description = "Stores latest date of 
Date Field so that the same data will not be fetched multiple times.")
--- End diff --

Do you really think that it is required to read the file? Backwards 
compatibility could also be realized by adding a filter query like 
fq=dateField:[* TO lastEndDate]. The user only had to specify the value of 
lastEndDate e. g. to an property of the processor.


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-13 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r144532208
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
@@ -66,42 +79,64 @@
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CursorMarkParams;
 
-@Tags({"Apache", "Solr", "Get", "Pull"})
+@Tags({"Apache", "Solr", "Get", "Pull", "Records"})
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
-@CapabilityDescription("Queries Solr and outputs the results as a 
FlowFile")
+@CapabilityDescription("Queries Solr and outputs the results as a FlowFile 
in the format of XML or using a Record Writer")
+@Stateful(scopes = {Scope.LOCAL}, description = "Stores latest date of 
Date Field so that the same data will not be fetched multiple times.")
--- End diff --

GetSolr used to use local file to store lastEndDate. We need migration code 
so that lastEndDate to be taken over to managed state when there's no state but 
the lastEndDate file exists.


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-13 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r144533090
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
@@ -66,42 +79,64 @@
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CursorMarkParams;
 
-@Tags({"Apache", "Solr", "Get", "Pull"})
+@Tags({"Apache", "Solr", "Get", "Pull", "Records"})
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
-@CapabilityDescription("Queries Solr and outputs the results as a 
FlowFile")
+@CapabilityDescription("Queries Solr and outputs the results as a FlowFile 
in the format of XML or using a Record Writer")
+@Stateful(scopes = {Scope.LOCAL}, description = "Stores latest date of 
Date Field so that the same data will not be fetched multiple times.")
--- End diff --

State scope should be CLUSTER, I think. Also, capability description should 
mention that this processor is designed to run on Primary Node only. Please 
refer ListHDFS processor documentation.

Or does this processor work nicely in distributed fashion by utilizing 
multiple NiFi nodes against a Solr cluster?


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-13 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r144527126
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/testCollection/conf/schema.xml
 ---
@@ -16,6 +16,16 @@
 
 
 
+
 
 
+
+
+
+
+
+
+
+id
--- End diff --

What if Solr doc doesn't have an uniqueKey? Does this processor still work 
without uniqueKey??


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-13 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r144526595
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java
 ---
@@ -275,7 +275,7 @@ protected final boolean isBasicAuthEnabled() {
 }
 
 @Override
-protected final Collection 
customValidate(ValidationContext context) {
+protected Collection 
customValidate(ValidationContext context) {
--- End diff --

Shouldn't we add another protected method to override at sub-classes?


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-13 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r144530989
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
@@ -138,10 +168,11 @@ protected void init(final 
ProcessorInitializationContext context) {
 descriptors.add(SOLR_TYPE);
 descriptors.add(SOLR_LOCATION);
 descriptors.add(COLLECTION);
+descriptors.add(RETURN_TYPE);
+descriptors.add(RECORD_WRITER);
 descriptors.add(SOLR_QUERY);
-descriptors.add(RETURN_FIELDS);
-descriptors.add(SORT_CLAUSE);
--- End diff --

Is it safe to remove an existing property? The existing code should not 
sort result anyway, or should store last sorted field value to paginate 
properly when docs with the same date split more than one page. So I think it's 
safe..


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-13 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r144530918
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
@@ -172,157 +203,196 @@ protected void init(final 
ProcessorInitializationContext context) {
 
 @Override
 public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
-lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE);
+clearState.set(true);
--- End diff --

Probably we'd like to clear state only when following properties get 
changed? It would be a bad UX if state is cleared when user re-configure batch 
size.
- SOLR_TYPE
- SOLR_LOCATION
- COLLECTION
- SOLR_QUERY
- DATE_FIELD
- RETURN_FIELDS



---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-13 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2199#discussion_r144533800
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
 ---
@@ -172,157 +203,196 @@ protected void init(final 
ProcessorInitializationContext context) {
 
 @Override
 public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
-lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE);
+clearState.set(true);
 }
 
-@OnStopped
-public void onStopped() {
-writeLastEndDate();
-}
+@OnScheduled
+public void onScheduled2(final ProcessContext context) throws 
IOException {
--- End diff --

Please change method name appropriately to represent what it does, such as 
`clearState`. The annotation explains when it's called.


---


[GitHub] nifi pull request #2199: NIFI-3248: Improvement of GetSolr Processor

2017-10-06 Thread JohannesDaniel
GitHub user JohannesDaniel opened a pull request:

https://github.com/apache/nifi/pull/2199

NIFI-3248: Improvement of GetSolr Processor

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [ ] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [ ] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/JohannesDaniel/nifi NIFI-3248

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2199.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2199


commit 8a5f7e54edc5640655edd19f15d22fada6ca9900
Author: JohannesDaniel 
Date:   2017-10-05T20:57:53Z

NIFI-3248: Improvement of GetSolr Processor




---