NIFI-40: initial implementation of prov query language

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/df43c8b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/df43c8b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/df43c8b6

Branch: refs/heads/prov-query-language
Commit: df43c8b62086a5ca970b6fd377eaddc835d9e3cb
Parents: b512ff1
Author: Mark Payne <marka...@hotmail.com>
Authored: Thu Mar 12 09:14:30 2015 -0400
Committer: Mark Payne <marka...@hotmail.com>
Committed: Thu Mar 12 09:14:30 2015 -0400

----------------------------------------------------------------------
 .../AbstractConfigurableComponent.java          |  15 ++-
 .../AbstractSessionFactoryProcessor.java        |   2 +
 .../provenance/ProvenanceEventRepository.java   |  29 ++++-
 .../nifi-provenance-query-language/pom.xml      |   5 +-
 .../org/apache/nifi/pql/ProvenanceQueryParser.g |   2 +-
 .../org/apache/nifi/pql/ProvenanceQuery.java    |  75 ++++++++++---
 .../conversion/StringToLongEvaluator.java       |  68 ++++++++++++
 .../extraction/ComponentTypeEvaluator.java      |  39 +++++++
 .../evaluation/function/TimeFieldEvaluator.java |   4 +
 .../java/org/apache/nifi/pql/groups/Group.java  |   2 +-
 .../java/org/apache/nifi/pql/TestQuery.java     |  12 +-
 nifi/nifi-commons/pom.xml                       |   1 +
 .../JournalingProvenanceRepository.java         |   3 +-
 .../journaling/index/LuceneIndexSearcher.java   |   2 +-
 .../TestJournalingProvenanceRepository.java     |  72 +++++++++++-
 .../PersistentProvenanceRepository.java         |  11 ++
 .../nifi-volatile-provenance-repository/pom.xml |   7 ++
 .../VolatileProvenanceRepository.java           | 110 ++++++++++++++++++-
 .../nifi-standard-reporting-tasks/pom.xml       |   5 +
 .../provenance/GenerateProvenanceReport.java    |  83 ++++++++++++++
 20 files changed, 515 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-api/src/main/java/org/apache/nifi/components/AbstractConfigurableComponent.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-api/src/main/java/org/apache/nifi/components/AbstractConfigurableComponent.java
 
b/nifi/nifi-api/src/main/java/org/apache/nifi/components/AbstractConfigurableComponent.java
index f4bea5e..e093785 100644
--- 
a/nifi/nifi-api/src/main/java/org/apache/nifi/components/AbstractConfigurableComponent.java
+++ 
b/nifi/nifi-api/src/main/java/org/apache/nifi/components/AbstractConfigurableComponent.java
@@ -22,8 +22,17 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-public abstract class AbstractConfigurableComponent implements 
ConfigurableComponent {
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.ProcessorInitializationContext;
 
+public abstract class AbstractConfigurableComponent implements 
ConfigurableComponent {
+    private ComponentLogger loger;
+    
+    @Override
+    public void initialize(final ProcessorInitializationContext context) {
+        logger = context.getLogger();
+    }
+    
     /**
      * Allows subclasses to perform their own validation on the already set
      * properties. Since each property is validated as it is set this allows
@@ -45,6 +54,10 @@ public abstract class AbstractConfigurableComponent 
implements ConfigurableCompo
         return Collections.emptySet();
     }
 
+    protected ComponentLog getLogger() {
+        return logger;
+    }
+    
     /**
      * Returns a PropertyDescriptor for the name specified that is fully
      * populated

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-api/src/main/java/org/apache/nifi/processor/AbstractSessionFactoryProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-api/src/main/java/org/apache/nifi/processor/AbstractSessionFactoryProcessor.java
 
b/nifi/nifi-api/src/main/java/org/apache/nifi/processor/AbstractSessionFactoryProcessor.java
index f13a143..57dee98 100644
--- 
a/nifi/nifi-api/src/main/java/org/apache/nifi/processor/AbstractSessionFactoryProcessor.java
+++ 
b/nifi/nifi-api/src/main/java/org/apache/nifi/processor/AbstractSessionFactoryProcessor.java
@@ -53,6 +53,8 @@ public abstract class AbstractSessionFactoryProcessor extends 
AbstractConfigurab
 
     @Override
     public final void initialize(final ProcessorInitializationContext context) 
{
+        super.initialize(context);
+        
         identifier = context.getIdentifier();
         logger = context.getLogger();
         serviceLookup = context.getControllerServiceLookup();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRepository.java
 
b/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRepository.java
index 18e5788..b969038 100644
--- 
a/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRepository.java
+++ 
b/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRepository.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.apache.nifi.provenance.query.ProvenanceQuerySubmission;
 import org.apache.nifi.provenance.search.Query;
 import org.apache.nifi.provenance.search.QuerySubmission;
 import org.apache.nifi.provenance.search.SearchableField;
@@ -103,6 +104,26 @@ public interface ProvenanceEventRepository extends 
Closeable {
      */
     Long getMaxEventId() throws IOException;
 
+    
+    /**
+     * Submits an asynchronous request to process the given Provenance Query 
Language query,
+     * returning an identifier that can be used to fetch the results at a 
later time
+     * 
+     * @param query
+     * @return
+     */
+    ProvenanceQuerySubmission submitQuery(String query);
+
+    /**
+     * Returns the ProvenanceQuerySubmission associated with the given 
identifier, or <code>null</code>
+     * if no query exists with the given identifier.
+     *
+     * @param queryIdentifier
+     *
+     * @return
+     */
+    ProvenanceQuerySubmission retrieveProvenanceQuerySubmission(String 
queryIdentifier);
+    
     /**
      * Submits an asynchronous request to process the given query, returning an
      * identifier that can be used to fetch the results at a later time
@@ -111,11 +132,11 @@ public interface ProvenanceEventRepository extends 
Closeable {
      * @return
      */
     QuerySubmission submitQuery(Query query);
-
+    
+    
     /**
-     * Returns the QueryResult associated with the given identifier, if the
-     * query has finished processing. If the query has not yet finished 
running,
-     * returns <code>null</code>.
+     * Returns the QuerySubmission associated with the given identifier, or 
<code>null</code>
+     * if no query exists with the provided identifier
      *
      * @param queryIdentifier
      *

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-commons/nifi-provenance-query-language/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-provenance-query-language/pom.xml 
b/nifi/nifi-commons/nifi-provenance-query-language/pom.xml
index a13fd16..905ed5b 100644
--- a/nifi/nifi-commons/nifi-provenance-query-language/pom.xml
+++ b/nifi/nifi-commons/nifi-provenance-query-language/pom.xml
@@ -16,7 +16,6 @@
                        <plugin>
                                <groupId>org.antlr</groupId>
                                <artifactId>antlr3-maven-plugin</artifactId>
-                               <version>3.4</version>
                                <executions>
                                        <execution>
                                                <goals>
@@ -33,6 +32,10 @@
                        <groupId>org.apache.nifi</groupId>
                        <artifactId>nifi-api</artifactId>
                </dependency>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-data-provenance-utils</artifactId>
+               </dependency>
 
                <dependency>
                        <groupId>org.antlr</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-commons/nifi-provenance-query-language/src/main/antlr3/org/apache/nifi/pql/ProvenanceQueryParser.g
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/antlr3/org/apache/nifi/pql/ProvenanceQueryParser.g
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/antlr3/org/apache/nifi/pql/ProvenanceQueryParser.g
index 3837729..25410bb 100644
--- 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/antlr3/org/apache/nifi/pql/ProvenanceQueryParser.g
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/antlr3/org/apache/nifi/pql/ProvenanceQueryParser.g
@@ -82,7 +82,7 @@ selectableSource : EVENT | IDENTIFIER;
 eventProperty : propertyName ->
        ^(EVENT_PROPERTY propertyName );
 
-propertyName : UUID | TRANSIT_URI | TIMESTAMP | FILESIZE | TYPE | COMPONENT_ID 
| RELATIONSHIP;
+propertyName : UUID | TRANSIT_URI | TIMESTAMP | FILESIZE | TYPE | COMPONENT_ID 
| COMPONENT_TYPE | RELATIONSHIP;
 
 attribute : STRING_LITERAL ->
        ^(ATTRIBUTE STRING_LITERAL);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/ProvenanceQuery.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/ProvenanceQuery.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/ProvenanceQuery.java
index 200a8ee..40ccbf5 100644
--- 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/ProvenanceQuery.java
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/ProvenanceQuery.java
@@ -20,6 +20,7 @@ import static org.apache.nifi.pql.ProvenanceQueryParser.LIMIT;
 import static org.apache.nifi.pql.ProvenanceQueryParser.LT;
 import static org.apache.nifi.pql.ProvenanceQueryParser.MATCHES;
 import static org.apache.nifi.pql.ProvenanceQueryParser.MINUTE;
+import static org.apache.nifi.pql.ProvenanceQueryParser.MONTH;
 import static org.apache.nifi.pql.ProvenanceQueryParser.NOT;
 import static org.apache.nifi.pql.ProvenanceQueryParser.NOT_EQUALS;
 import static org.apache.nifi.pql.ProvenanceQueryParser.NUMBER;
@@ -40,6 +41,8 @@ import static org.apache.nifi.pql.ProvenanceQueryParser.YEAR;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -66,6 +69,7 @@ import 
org.apache.nifi.pql.evaluation.comparison.LessThanEvaluator;
 import org.apache.nifi.pql.evaluation.comparison.MatchesEvaluator;
 import org.apache.nifi.pql.evaluation.comparison.RecordTypeEvaluator;
 import org.apache.nifi.pql.evaluation.comparison.StartsWithEvaluator;
+import org.apache.nifi.pql.evaluation.conversion.StringToLongEvaluator;
 import org.apache.nifi.pql.evaluation.extraction.AttributeEvaluator;
 import org.apache.nifi.pql.evaluation.extraction.ComponentIdEvaluator;
 import org.apache.nifi.pql.evaluation.extraction.RelationshipEvaluator;
@@ -84,14 +88,17 @@ import org.apache.nifi.pql.evaluation.order.GroupedSorter;
 import org.apache.nifi.pql.evaluation.order.RowSorter;
 import org.apache.nifi.pql.evaluation.order.SortDirection;
 import org.apache.nifi.pql.evaluation.repository.SelectAllRecords;
+import org.apache.nifi.pql.exception.ProvenanceQueryLanguageException;
 import org.apache.nifi.pql.exception.ProvenanceQueryLanguageParsingException;
 import org.apache.nifi.pql.results.GroupingResultSet;
 import org.apache.nifi.pql.results.StandardOrderedResultSet;
 import org.apache.nifi.pql.results.StandardUnorderedResultSet;
 import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.SearchableFields;
 import org.apache.nifi.provenance.StoredProvenanceEvent;
 import org.apache.nifi.provenance.query.ProvenanceResultSet;
+import org.apache.nifi.provenance.search.SearchableField;
 
 public class ProvenanceQuery {
        private final Tree tree;
@@ -103,16 +110,19 @@ public class ProvenanceQuery {
        private final RowSorter sorter;
        private final Long limit;
        
+       private final Set<SearchableField> searchableFields;
+       private final Set<String> searchableAttributes;
        private long accumulatorIdGenerator = 0L;
        
-       public static ProvenanceQuery compile(final String pql) {
+       
+       public static ProvenanceQuery compile(final String pql, final 
Collection<SearchableField> searchableFields, final Collection<SearchableField> 
searchableAttributes) {
                try {
             final CommonTokenStream lexerTokenStream = createTokenStream(pql);
             final ProvenanceQueryParser parser = new 
ProvenanceQueryParser(lexerTokenStream);
             final Tree ast = (Tree) parser.pql().getTree();
             final Tree tree = ast.getChild(0);
             
-            return new ProvenanceQuery(tree, pql);
+            return new ProvenanceQuery(tree, pql, searchableFields, 
searchableAttributes);
         } catch (final ProvenanceQueryLanguageParsingException e) {
             throw e;
         } catch (final Exception e) {
@@ -126,9 +136,19 @@ public class ProvenanceQuery {
         return new CommonTokenStream(lexer);
     }
        
-       private ProvenanceQuery(final Tree tree, final String pql) {
+       private ProvenanceQuery(final Tree tree, final String pql, final 
Collection<SearchableField> searchableFields, final Collection<SearchableField> 
searchableAttributes) {
                this.tree = tree;
                this.pql = pql;
+               this.searchableFields = searchableFields == null ? null : 
Collections.unmodifiableSet(new HashSet<>(searchableFields));
+               if (searchableAttributes == null) {
+                   this.searchableAttributes = null;
+               } else {
+               final Set<String> attributes = new HashSet<>();
+               for ( final SearchableField attr : searchableAttributes ) {
+                   attributes.add(attr.getSearchableFieldName());
+               }
+               this.searchableAttributes = 
Collections.unmodifiableSet(attributes);
+               }
                
                Tree fromTree = null;
                Tree whereTree = null;
@@ -162,7 +182,6 @@ public class ProvenanceQuery {
 
                sourceEvaluator = (fromTree == null) ? null : 
buildSourceEvaluator(fromTree);
                
-               // Distribute AND's over OR's.
                final BooleanEvaluator where = (whereTree == null) ? null : 
buildConditionEvaluator(whereTree.getChild(0));
                conditionEvaluator = where;
                
@@ -192,10 +211,6 @@ public class ProvenanceQuery {
                                selectAccumulators = accumulators;
                        }
                }
-               
-               
-               //repoEvaluator = new SelectAllRecords();
-               //repoEvaluator = new 
SelectFromLucene(LuceneTranslator.toLuceneQuery(where));
        }
        
        @Override
@@ -298,35 +313,59 @@ public class ProvenanceQuery {
                case EVENT_PROPERTY:
                        switch (tree.getChild(0).getType()) {
                                case FILESIZE:
+                                   if ( searchableFields != null && 
!searchableFields.contains(SearchableFields.FileSize) ) {
+                                       throw new 
ProvenanceQueryLanguageException("Query cannot reference FileSize because this 
field is not searchable by the repository");
+                                   }
                                        return new SizeEvaluator();
                                case TRANSIT_URI:
+                                   if ( searchableFields != null && 
!searchableFields.contains(SearchableFields.TransitURI) ) {
+                            throw new ProvenanceQueryLanguageException("Query 
cannot reference TransitURI because this field is not searchable by the 
repository");
+                        }
                                        return new TransitUriEvaluator();
                                case TIMESTAMP:
+                                   // time is always indexed
                                        return new TimestampEvaluator();
                                case TYPE:
+                                   // type is always indexed so no need to 
check it
                                        return new TypeEvaluator();
                                case COMPONENT_ID:
+                                   if ( searchableFields != null && 
!searchableFields.contains(SearchableFields.ComponentID) ) {
+                            throw new ProvenanceQueryLanguageException("Query 
cannot reference Component ID because this field is not searchable by the 
repository");
+                        }
                                        return new ComponentIdEvaluator();
+                                       // TODO: Allow Component Type to be 
indexed and searched
                                case RELATIONSHIP:
+                                   if ( searchableFields != null && 
!searchableFields.contains(SearchableFields.Relationship) ) {
+                            throw new ProvenanceQueryLanguageException("Query 
cannot reference Relationship because this field is not searchable by the 
repository");
+                        }
                                        return new RelationshipEvaluator();
                                case UUID:
+                                   if ( searchableFields != null && 
!searchableFields.contains(SearchableFields.FlowFileUUID) ) {
+                            throw new ProvenanceQueryLanguageException("Query 
cannot reference FlowFile UUID because this field is not searchable by the 
repository");
+                        }
                                    return new UuidEvaluator();
                                default:
                                        // TODO: IMPLEMENT
                                        throw new 
UnsupportedOperationException("Haven't implemented extraction of property " + 
tree.getChild(0).getText());
                        }
                case ATTRIBUTE:
+                   final String attributeName = tree.getChild(0).getText();
+                   if ( searchableAttributes != null && 
!searchableAttributes.contains(attributeName) ) {
+                       throw new ProvenanceQueryLanguageException("Query 
cannot attribute '" + attributeName + "' because this attribute is not 
searchable by the repository");
+                   }
                        return new 
AttributeEvaluator(toStringEvaluator(buildOperandEvaluator(tree.getChild(0)), 
tree));
                case STRING_LITERAL:
                        return new StringLiteralEvaluator(tree.getText());
                case NUMBER:
                        return new 
LongLiteralEvaluator(Long.valueOf(tree.getText()));
+            case YEAR:
+                return new 
TimeFieldEvaluator(toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), 
tree), Calendar.YEAR, YEAR);
+            case MONTH:
+                return new 
TimeFieldEvaluator(toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), 
tree), Calendar.MONTH, MONTH);
+            case DAY:
+                return new 
TimeFieldEvaluator(toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), 
tree), Calendar.DAY_OF_YEAR, DAY);
                case HOUR:
                        return new 
TimeFieldEvaluator(toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), 
tree), Calendar.HOUR_OF_DAY, HOUR);
-               case DAY:
-                       return new 
TimeFieldEvaluator(toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), 
tree), Calendar.DAY_OF_YEAR, DAY);
-               case YEAR:
-                       return new 
TimeFieldEvaluator(toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), 
tree), Calendar.YEAR, YEAR);
                case MINUTE:
                        return new 
TimeFieldEvaluator(toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), 
tree), Calendar.MINUTE, MINUTE);
                case SECOND:
@@ -412,6 +451,16 @@ public class ProvenanceQuery {
     }
     
     private OperandEvaluator<Long> toLongEvaluator(final OperandEvaluator<?> 
eval, final Tree tree) {
+        if ( eval.getType() == Long.class ) {
+            @SuppressWarnings("unchecked")
+            final OperandEvaluator<Long> retEvaluator = 
((OperandEvaluator<Long>) eval);
+            return retEvaluator;
+        } else if ( eval.getType() == String.class ) {
+            @SuppressWarnings("unchecked")
+            final OperandEvaluator<String> stringEval = 
((OperandEvaluator<String>) eval);
+            return new StringToLongEvaluator(stringEval);
+        }
+        
        return castEvaluator(eval, tree, Long.class);
     }
     
@@ -499,7 +548,7 @@ public class ProvenanceQuery {
     
     
     public static ProvenanceResultSet execute(final String query, final 
ProvenanceEventRepository repo) throws IOException {
-       return ProvenanceQuery.compile(query).execute(repo);
+       return ProvenanceQuery.compile(query, null, null).execute(repo);
     }
     
     

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/conversion/StringToLongEvaluator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/conversion/StringToLongEvaluator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/conversion/StringToLongEvaluator.java
new file mode 100644
index 0000000..38ac9f0
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/conversion/StringToLongEvaluator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pql.evaluation.conversion;
+
+import org.apache.nifi.pql.evaluation.OperandEvaluator;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public class StringToLongEvaluator implements OperandEvaluator<Long> {
+    private final OperandEvaluator<String> stringEvaluator;
+    
+    public StringToLongEvaluator(final OperandEvaluator<String> 
stringEvaluator) {
+        this.stringEvaluator = stringEvaluator;
+    }
+    
+    @Override
+    public Long evaluate(final ProvenanceEventRecord record) {
+        final String result = stringEvaluator.evaluate(record);
+        if (result == null) {
+            return null;
+        }
+
+        final String trimmed = result.trim();
+        if ( trimmed.isEmpty() ) {
+            return 0L;
+        }
+        
+        if ( isNumber(trimmed) ) {
+            return Long.parseLong(trimmed);
+        }
+        
+        return null;
+    }
+    
+    private boolean isNumber(final String value) {
+        for (int i=0; i < value.length(); i++) {
+            final char c = value.charAt(i);
+            if ( c < '0' || c > '9' ) {
+                return false;
+            }
+        }
+        
+        return true;
+    }
+
+    @Override
+    public int getEvaluatorType() {
+        return -1;
+    }
+
+    @Override
+    public Class<Long> getType() {
+        return Long.class;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/ComponentTypeEvaluator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/ComponentTypeEvaluator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/ComponentTypeEvaluator.java
new file mode 100644
index 0000000..5455055
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/ComponentTypeEvaluator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pql.evaluation.extraction;
+
+import org.apache.nifi.pql.evaluation.OperandEvaluator;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public class ComponentTypeEvaluator implements OperandEvaluator<String> {
+
+    @Override
+    public String evaluate(final ProvenanceEventRecord record) {
+        return record.getComponentType();
+    }
+
+    @Override
+    public int getEvaluatorType() {
+        return org.apache.nifi.pql.ProvenanceQueryParser.COMPONENT_TYPE;
+    }
+
+    @Override
+    public Class<String> getType() {
+        return String.class;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/function/TimeFieldEvaluator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/function/TimeFieldEvaluator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/function/TimeFieldEvaluator.java
index 44450c9..373fc72 100644
--- 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/function/TimeFieldEvaluator.java
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/function/TimeFieldEvaluator.java
@@ -18,9 +18,13 @@ public class TimeFieldEvaluator implements 
OperandEvaluator<Long> {
                this.timeExtractor = timeExtractor;
                this.evaluatorType = evaluatorType;
                
+               // note the case statements below are designed to "bleed 
through."
+               // I.e., if time field is YEAR, we want to clear all of the 
fields starting with month.
                switch (timeField) {
                        case Calendar.YEAR:
                                fieldsToClear.add(Calendar.MONTH);
+                       case Calendar.MONTH:
+                           fieldsToClear.add(Calendar.DAY_OF_MONTH);
                        case Calendar.DAY_OF_MONTH:
                                fieldsToClear.add(Calendar.HOUR);
                        case Calendar.HOUR:

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/groups/Group.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/groups/Group.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/groups/Group.java
index a3e87c1..35ee93b 100644
--- 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/groups/Group.java
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/groups/Group.java
@@ -19,7 +19,7 @@ public class Group {
                int prime = 23497;
                int hc = 1;
                for ( final Object o : values ) {
-                       hc = prime * hc + o.hashCode();
+                       hc = prime * hc + (o == null ? 0 : o.hashCode());
                }
                
                this.hashCode = hc;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-commons/nifi-provenance-query-language/src/test/java/org/apache/nifi/pql/TestQuery.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/test/java/org/apache/nifi/pql/TestQuery.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/test/java/org/apache/nifi/pql/TestQuery.java
index b6012db..3317c21 100644
--- 
a/nifi/nifi-commons/nifi-provenance-query-language/src/test/java/org/apache/nifi/pql/TestQuery.java
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/test/java/org/apache/nifi/pql/TestQuery.java
@@ -92,13 +92,13 @@ public class TestQuery {
        
        @Test
        public void testCompilationManually() {
-               System.out.println(ProvenanceQuery.compile("SELECT R.TransitUri 
FROM *"));
-               System.out.println(ProvenanceQuery.compile("SELECT 
R['filename'] FROM RECEIVE, SEND;"));
-               System.out.println(ProvenanceQuery.compile("SELECT Event FROM 
RECEIVE ORDER BY Event['filename'];"));
+               System.out.println(ProvenanceQuery.compile("SELECT R.TransitUri 
FROM *", null, null));
+               System.out.println(ProvenanceQuery.compile("SELECT 
R['filename'] FROM RECEIVE, SEND;", null, null));
+               System.out.println(ProvenanceQuery.compile("SELECT Event FROM 
RECEIVE ORDER BY Event['filename'];", null, null));
                
 //             System.out.println(Query.compile("SELECT Event FROM RECEIVE 
WHERE ((Event.TransitUri <> 'http') OR (Event['filename'] = '1.txt')) and 
(Event.Size > 1000 or Event.Size between 1 AND 4);"));
                
-               System.out.println(ProvenanceQuery.compile("SELECT 
SUM(Event.size) FROM RECEIVE"));
+               System.out.println(ProvenanceQuery.compile("SELECT 
SUM(Event.size) FROM RECEIVE", null, null));
        }
        
        
@@ -107,7 +107,7 @@ public class TestQuery {
                createRecords();
                dump(ProvenanceQuery.execute("SELECT Event", repo));
                
-               final ProvenanceQuery query = ProvenanceQuery.compile("SELECT 
SUM(Event.Size), AVG(Event.Size) FROM RECEIVE WHERE Event.TransitUri = 
'https://localhost:80/nifi'");
+               final ProvenanceQuery query = ProvenanceQuery.compile("SELECT 
SUM(Event.Size), AVG(Event.Size) FROM RECEIVE WHERE Event.TransitUri = 
'https://localhost:80/nifi'", null, null);
                
                final ProvenanceResultSet rs = query.execute(repo);
                dump(rs);
@@ -302,7 +302,7 @@ public class TestQuery {
                                + ")";
                System.out.println(queryString);
                
-               final ProvenanceQuery query = 
ProvenanceQuery.compile(queryString);
+               final ProvenanceQuery query = 
ProvenanceQuery.compile(queryString, null, null);
                
                System.out.println(query.getWhereClause());
                

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-commons/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/pom.xml b/nifi/nifi-commons/pom.xml
index ec0bb62..35d64fe 100644
--- a/nifi/nifi-commons/pom.xml
+++ b/nifi/nifi-commons/pom.xml
@@ -35,5 +35,6 @@
         <module>nifi-web-utils</module>
         <module>nifi-processor-utilities</module>
         <module>nifi-write-ahead-log</module>
+               <module>nifi-provenance-query-language</module>
     </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
index 2643787..fc197e8 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
@@ -518,6 +518,7 @@ public class JournalingProvenanceRepository implements 
ProvenanceEventRepository
         return queryManager.retrieveProvenanceQuerySubmission(queryIdentifier);
     }
     
+    @Override
     public ProvenanceQuerySubmission submitQuery(final String query) {
         ProvenanceQuerySubmission submission;
         final AtomicLong lastTimeProgressMade = new 
AtomicLong(System.nanoTime());
@@ -525,7 +526,7 @@ public class JournalingProvenanceRepository implements 
ProvenanceEventRepository
         
         try {
             final ProgressAwareIterator<? extends StoredProvenanceEvent> 
eventItr = selectMatchingEvents(query, lastTimeProgressMade);
-            final ProvenanceResultSet rs = 
ProvenanceQuery.compile(query).evaluate(eventItr);
+            final ProvenanceResultSet rs = ProvenanceQuery.compile(query, 
getSearchableFields(), getSearchableAttributes()).evaluate(eventItr);
             
             submission = new JournalingRepoQuerySubmission(query, new 
ProvenanceQueryResult() {
                 @Override

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
index cd57991..3fb641e 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
@@ -197,7 +197,7 @@ public class LuceneIndexSearcher implements 
EventIndexSearcher {
     
     
     private <T> Iterator<T> select(final String query, final 
DocumentTransformer<T> transformer) throws IOException {
-        final org.apache.lucene.search.Query luceneQuery = 
LuceneTranslator.toLuceneQuery(ProvenanceQuery.compile(query).getWhereClause());
+        final org.apache.lucene.search.Query luceneQuery = 
LuceneTranslator.toLuceneQuery(ProvenanceQuery.compile(query, 
repo.getSearchableFields(), repo.getSearchableAttributes()).getWhereClause());
         final int batchSize = 1000;
         
         final ObjectHolder<TopDocs> topDocsHolder = new ObjectHolder<>(null);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java
index e51f5b7..4ddfba5 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java
@@ -352,7 +352,8 @@ public class TestJournalingProvenanceRepository {
         
         config.setPartitionCount(1);
         config.setSearchableFields(Arrays.asList(new SearchableField[] {
-                SearchableFields.FlowFileUUID
+                SearchableFields.FlowFileUUID,
+                SearchableFields.EventTime
         }));
         
         try (final JournalingProvenanceRepository repo = new 
JournalingProvenanceRepository(config)) {
@@ -596,7 +597,8 @@ public class TestJournalingProvenanceRepository {
         
         config.setPartitionCount(3);
         config.setSearchableFields(Arrays.asList(new SearchableField[] {
-                SearchableFields.FlowFileUUID
+                SearchableFields.FlowFileUUID,
+                SearchableFields.FileSize
         }));
         
         try (final JournalingProvenanceRepository repo = new 
JournalingProvenanceRepository(config)) {
@@ -640,6 +642,72 @@ public class TestJournalingProvenanceRepository {
     }
     
     
+    
+    @Test()
+    public void testAggregateQuery2AgainstMany() throws IOException, 
InterruptedException {
+        final JournalingRepositoryConfig config = new 
JournalingRepositoryConfig();
+        config.setEventExpiration(10, TimeUnit.MINUTES);
+        config.setJournalRolloverPeriod(10, TimeUnit.MINUTES);
+        config.setJournalCapacity(1024 * 1024 * 1024);
+        
+        final Map<String, File> containers = new HashMap<>();
+        containers.put("container1", new File("target/" + 
UUID.randomUUID().toString()));
+        config.setContainers(containers);
+        
+        config.setPartitionCount(1);
+        config.setSearchableFields(Arrays.asList(new SearchableField[] {
+                SearchableFields.FlowFileUUID
+        }));
+        config.setSearchableAttributes(Arrays.asList(new SearchableField[] {
+                SearchableFields.newSearchableAttribute("i"),
+                SearchableFields.newSearchableAttribute("j")
+        }));
+        
+        try (final JournalingProvenanceRepository repo = new 
JournalingProvenanceRepository(config)) {
+            repo.initialize(null);
+            
+            final Map<String, String> attributes = new HashMap<>();
+            
+            final long start = System.nanoTime();
+            final List<ProvenanceEventRecord> events = new ArrayList<>(1000);
+            for (int j=0; j < 100; j++) {
+                attributes.put("j", String.valueOf(j));
+                for (int i=0; i < 100; i++) {
+                    attributes.put("i", String.valueOf(i));
+                    final ProvenanceEventRecord event = 
TestUtil.generateEvent(i, attributes);
+                    events.add(event);
+                    if ( events.size() % 100 == 0 ) {
+                        repo.registerEvents(events);
+                        events.clear();
+                    }
+                }
+            }
+            final long registerFinish = System.nanoTime();
+            
+            final ProvenanceResultSet rs = repo.query("SELECT Event['j'], 
COUNT(Event['i']), SUM(Event['i']) GROUP BY Event['j']");
+            System.out.println(rs.getLabels());
+            while ( rs.hasNext() ) {
+                final List<?> cols = rs.next();
+                System.out.println(cols);
+            }
+            
+            final long searchFinish = System.nanoTime();
+            System.out.println("Register records: " + 
TimeUnit.NANOSECONDS.toMillis(registerFinish - start) + " millis");
+            System.out.println("Query records: " + 
TimeUnit.NANOSECONDS.toMillis(searchFinish - registerFinish) + " millis");
+            
+        } catch (final Exception e) {
+            e.printStackTrace();
+            Assert.fail(e.toString());
+        } finally {
+            for ( final File file : containers.values() ) {
+                FileUtils.deleteFile(file, true);
+            }
+        }
+    }
+    
+    
+    
+    
     @Test(timeout=10000)
     public void testReceiveDropLineage() throws IOException, 
InterruptedException {
         final JournalingRepositoryConfig config = new 
JournalingRepositoryConfig();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 716c8b7..fef120d 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -71,6 +71,7 @@ import org.apache.nifi.provenance.lucene.IndexSearch;
 import org.apache.nifi.provenance.lucene.IndexingAction;
 import org.apache.nifi.provenance.lucene.LineageQuery;
 import org.apache.nifi.provenance.lucene.LuceneUtil;
+import org.apache.nifi.provenance.query.ProvenanceQuerySubmission;
 import org.apache.nifi.provenance.rollover.CompressionAction;
 import org.apache.nifi.provenance.rollover.RolloverAction;
 import org.apache.nifi.provenance.search.Query;
@@ -1927,4 +1928,14 @@ public class PersistentProvenanceRepository implements 
ProvenanceEventRepository
         
         return firstEvents.get(0).getEventTime();
     }
+
+    @Override
+    public ProvenanceQuerySubmission submitQuery(final String query) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ProvenanceQuerySubmission retrieveProvenanceQuerySubmission(final 
String queryIdentifier) {
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/pom.xml
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/pom.xml
index d95e29f..70cc5b4 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/pom.xml
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/pom.xml
@@ -30,6 +30,13 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-data-provenance-utils</artifactId>
         </dependency>
+        <!-- 
+        <dependency>
+               <groupId>org.apache.nifi</groupId>
+               <artifactId>nifi-provenance-query-language</artifactId>
+               <version>0.0.2-incubating-SNAPSHOT</version>
+        </dependency>
+        -->
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-utils</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
index a20ca75..bd084bc 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
@@ -43,6 +43,7 @@ import 
org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
 import org.apache.nifi.provenance.lineage.FlowFileLineage;
 import org.apache.nifi.provenance.lineage.Lineage;
 import org.apache.nifi.provenance.lineage.LineageComputationType;
+import org.apache.nifi.provenance.query.ProvenanceQuerySubmission;
 import org.apache.nifi.provenance.search.Query;
 import org.apache.nifi.provenance.search.QueryResult;
 import org.apache.nifi.provenance.search.QuerySubmission;
@@ -56,7 +57,8 @@ import org.apache.nifi.util.RingBuffer.ForEachEvaluator;
 import org.apache.nifi.util.RingBuffer.IterationDirection;
 
 public class VolatileProvenanceRepository implements ProvenanceEventRepository 
{
-
+    private static final int MAX_CONCURRENT_QUERIES = 10;
+    
     // properties
     public static final String BUFFER_SIZE = 
"nifi.provenance.repository.buffer.size";
 
@@ -71,6 +73,7 @@ public class VolatileProvenanceRepository implements 
ProvenanceEventRepository {
 
     private final ConcurrentMap<String, AsyncQuerySubmission> 
querySubmissionMap = new ConcurrentHashMap<>();
     private final ConcurrentMap<String, AsyncLineageSubmission> 
lineageSubmissionMap = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, ProvenanceQuerySubmission> 
provQuerySubmissionMap = new ConcurrentHashMap<>();
     private final AtomicLong idGenerator = new AtomicLong(0L);
     private final AtomicBoolean initialized = new AtomicBoolean(false);
 
@@ -613,4 +616,109 @@ public class VolatileProvenanceRepository implements 
ProvenanceEventRepository {
         }
     }
 
+    
+    
+    
+    @Override
+    public ProvenanceQuerySubmission submitQuery(final String query) {
+        throw new UnsupportedOperationException();
+        /*
+        if ( provQuerySubmissionMap.size() > MAX_CONCURRENT_QUERIES ) {
+            final List<String> toRemove = new ArrayList<>();
+            final Date now = new Date();
+            
+            for ( final Map.Entry<String, ProvenanceQuerySubmission> entry : 
provQuerySubmissionMap.entrySet() ) {
+                if ( entry.getValue().getResult().getExpiration().after(now) ) 
{
+                    toRemove.add(entry.getKey());
+                }
+            }
+            
+            for ( final String id : toRemove ) {
+                provQuerySubmissionMap.remove(id);
+            }
+            
+            if ( provQuerySubmissionMap.size() > MAX_CONCURRENT_QUERIES ) {
+                throw new IllegalStateException("There are already " + 
MAX_CONCURRENT_QUERIES + " outstanding queries for this Provenance Repository; 
cannot perform any more queries until the existing queries are expired or 
canceled");
+            }
+        }
+        
+        final Iterator<StoredProvenanceEvent> eventItr = 
ringBuffer.asList().iterator();
+        final ProvenanceResultSet rs = ProvenanceQuery.compile(query, 
getSearchableFields(), getSearchableAttributes()).evaluate(eventItr);
+        
+        final Date submissionTime = new Date();
+        final String queryId = UUID.randomUUID().toString();
+        final Date expiration = new Date(System.currentTimeMillis() + 
TimeUnit.MINUTES.toNanos(10));
+        final ProvenanceQuerySubmission submission = new 
ProvenanceQuerySubmission() {
+            private final AtomicBoolean canceled = new AtomicBoolean(false);
+            
+            @Override
+            public String getQuery() {
+                return query;
+            }
+
+            @Override
+            public ProvenanceQueryResult getResult() {
+                return new ProvenanceQueryResult() {
+                    @Override
+                    public ProvenanceResultSet getResultSet() {
+                        return rs;
+                    }
+
+                    @Override
+                    public Date getExpiration() {
+                        return expiration;
+                    }
+
+                    @Override
+                    public String getError() {
+                        return null;
+                    }
+
+                    @Override
+                    public int getPercentComplete() {
+                        return 100;
+                    }
+
+                    @Override
+                    public boolean isFinished() {
+                        return true;
+                    }
+                };
+            }
+
+            @Override
+            public Date getSubmissionTime() {
+                return submissionTime;
+            }
+
+            @Override
+            public String getQueryIdentifier() {
+                return queryId;
+            }
+
+            @Override
+            public void cancel() {
+                canceled.set(true);
+                provQuerySubmissionMap.remove(queryId);
+            }
+
+            @Override
+            public boolean isCanceled() {
+                return canceled.get();
+            }
+        };
+        
+        provQuerySubmissionMap.putIfAbsent(queryId, submission);
+        return submission;
+        */
+    }
+
+    
+    @Override
+    public ProvenanceQuerySubmission retrieveProvenanceQuerySubmission(final 
String queryIdentifier) {
+        throw new UnsupportedOperationException();
+        /*
+        return provQuerySubmissionMap.get(queryIdentifier);
+        */
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/pom.xml
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/pom.xml
index 5f6a1ba..debf202 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/pom.xml
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/pom.xml
@@ -39,6 +39,11 @@
             <artifactId>nifi-processor-utils</artifactId>
         </dependency>
         <dependency>
+               <groupId>org.apache.nifi</groupId>
+               <artifactId>nifi-provenance-query-language</artifactId>
+               <version>0.0.2-incubating-SNAPSHOT</version>
+        </dependency>
+        <dependency>
             <groupId>com.yammer.metrics</groupId>
             <artifactId>metrics-ganglia</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/provenance/GenerateProvenanceReport.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/provenance/GenerateProvenanceReport.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/provenance/GenerateProvenanceReport.java
new file mode 100644
index 0000000..6352539
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/provenance/GenerateProvenanceReport.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance;
+
+import java.io.IOException;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.pql.ProvenanceQuery;
+import org.apache.nifi.pql.exception.ProvenanceQueryLanguageParsingException;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.query.ProvenanceResultSet;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.ReportingContext;
+
+public class GenerateProvenanceReport extends AbstractReportingTask {
+
+    public static final PropertyDescriptor QUERY = new 
PropertyDescriptor.Builder()
+        .name("Provenance Query")
+        .description("The Provenance Query to run against the repository")
+        .required(true)
+        .expressionLanguageSupported(false)
+        .addValidator(new ProvenanceQueryLanguageValidator())
+        .build();
+    public static final PropertyDescriptor DESTINATION_FILE = new 
PropertyDescriptor.Builder()
+        .name("Destination File")
+        .description("The file to write the results to. If not specified, the 
results will be written to the log")
+        .required(false)
+        .expressionLanguageSupported(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+    
+    
+    private ProvenanceQuery query;
+    
+    @OnScheduled
+    public synchronized void compileQuery(final ReportingContext context) {
+        final String queryText = context.getProperty(QUERY).getValue();
+        this.query = ProvenanceQuery.compile(queryText, 
context.getEventAccess().getProvenanceRepository().getSearchableFields(), 
+                
context.getEventAccess().getProvenanceRepository().getSearchableAttributes());
+    }
+    
+    @Override
+    public synchronized void onTrigger(final ReportingContext context) {
+        try {
+            final ProvenanceResultSet rs = 
query.execute(context.getEventAccess().getProvenanceRepository());
+            
+        } catch (final IOException ioe) {
+            throw new ProcessException(ioe);
+        }
+    }
+
+    private static class ProvenanceQueryLanguageValidator implements Validator 
{
+        @Override
+        public ValidationResult validate(final String subject, final String 
input, final ValidationContext context) {
+            try {
+                ProvenanceQuery.compile(input, null, null);
+            } catch (final ProvenanceQueryLanguageParsingException e) {
+                return new 
ValidationResult.Builder().input(input).subject(subject).valid(false).explanation(e.getMessage()).build();
+            }
+            
+            return new 
ValidationResult.Builder().input(input).subject(subject).valid(true).build();
+        }
+    }
+}

Reply via email to