began implementation of provenance query language

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/b512ff12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/b512ff12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/b512ff12

Branch: refs/heads/prov-query-language
Commit: b512ff1277262c12163011c1c64f8a7fa6ccf76d
Parents: a2219eb
Author: Mark Payne <[email protected]>
Authored: Wed Mar 4 15:00:13 2015 -0500
Committer: Mark Payne <[email protected]>
Committed: Wed Mar 4 15:00:13 2015 -0500

----------------------------------------------------------------------
 .../provenance/ProvenanceEventRepository.java   |   4 +-
 .../provenance/query/ProvenanceQueryResult.java |  60 ++
 .../query/ProvenanceQuerySubmission.java        |  64 +++
 .../provenance/query/ProvenanceResultSet.java   |  36 ++
 .../nifi-provenance-query-language/.gitignore   |   1 +
 .../nifi-provenance-query-language/pom.xml      |  65 +++
 .../org/apache/nifi/pql/ProvenanceQueryLexer.g  | 193 +++++++
 .../org/apache/nifi/pql/ProvenanceQueryParser.g | 141 +++++
 .../java/org/apache/nifi/pql/Formatter.java     |   5 +
 .../org/apache/nifi/pql/LuceneTranslator.java   | 189 ++++++
 .../org/apache/nifi/pql/ProvenanceQuery.java    | 574 +++++++++++++++++++
 .../apache/nifi/pql/evaluation/Accumulator.java |  28 +
 .../nifi/pql/evaluation/BooleanEvaluator.java   |   5 +
 .../nifi/pql/evaluation/OperandEvaluator.java   |   7 +
 .../nifi/pql/evaluation/RecordEvaluator.java    |  11 +
 .../pql/evaluation/RepositoryEvaluator.java     |  13 +
 .../accumulation/AverageAccumulator.java        | 112 ++++
 .../accumulation/CountAccumulator.java          |  91 +++
 .../accumulation/EventAccumulator.java          |  98 ++++
 .../evaluation/accumulation/SumAccumulator.java |  91 +++
 .../evaluation/comparison/ConversionUtils.java  |  97 ++++
 .../evaluation/comparison/EqualsEvaluator.java  |  74 +++
 .../comparison/GreaterThanEvaluator.java        |  51 ++
 .../comparison/LessThanEvaluator.java           |  51 ++
 .../evaluation/comparison/MatchesEvaluator.java |  72 +++
 .../comparison/RecordTypeEvaluator.java         |  34 ++
 .../comparison/StartsWithEvaluator.java         |  57 ++
 .../conversion/DateToLongEvaluator.java         |  27 +
 .../extraction/AttributeEvaluator.java          |  42 ++
 .../extraction/ComponentIdEvaluator.java        |  23 +
 .../extraction/RelationshipEvaluator.java       |  23 +
 .../evaluation/extraction/SizeEvaluator.java    |  23 +
 .../extraction/TimestampEvaluator.java          |  26 +
 .../extraction/TransitUriEvaluator.java         |  23 +
 .../evaluation/extraction/TypeEvaluator.java    |  24 +
 .../evaluation/extraction/UuidEvaluator.java    |  39 ++
 .../evaluation/function/TimeFieldEvaluator.java |  60 ++
 .../literals/LongLiteralEvaluator.java          |  27 +
 .../literals/StringLiteralEvaluator.java        |  33 ++
 .../nifi/pql/evaluation/logic/AndEvaluator.java | 114 ++++
 .../nifi/pql/evaluation/logic/OrEvaluator.java  |  58 ++
 .../nifi/pql/evaluation/order/CellValue.java    |  69 +++
 .../nifi/pql/evaluation/order/FieldSorter.java  |  87 +++
 .../pql/evaluation/order/GroupedSorter.java     | 123 ++++
 .../nifi/pql/evaluation/order/RowSorter.java    |  13 +
 .../pql/evaluation/order/SortDirection.java     |   6 +
 .../nifi/pql/evaluation/order/Sorters.java      |  74 +++
 .../evaluation/repository/SelectAllRecords.java |  61 ++
 .../ProvenanceQueryLanguageException.java       |  23 +
 ...ProvenanceQueryLanguageParsingException.java |  23 +
 .../java/org/apache/nifi/pql/groups/Group.java  |  57 ++
 .../org/apache/nifi/pql/groups/Grouper.java     |  20 +
 .../nifi/pql/results/GroupingResultSet.java     | 161 ++++++
 .../nifi/pql/results/OrderedResultSet.java      |  50 ++
 .../org/apache/nifi/pql/results/ResultRow.java  |  20 +
 .../apache/nifi/pql/results/RowIterator.java    |  69 +++
 .../pql/results/StandardOrderedResultSet.java   | 111 ++++
 .../pql/results/StandardUnorderedResultSet.java | 104 ++++
 .../src/main/resources/docs/examples.pql        |  48 ++
 .../resources/docs/implementation-notes.txt     |  52 ++
 .../java/org/apache/nifi/pql/TestQuery.java     | 348 +++++++++++
 .../src/test/resources/nifi.properties          | 136 +++++
 .../pom.xml                                     |   9 +
 .../JournalingProvenanceRepository.java         | 161 +++++-
 .../JournalingRepoQuerySubmission.java          |  80 +++
 .../LazyInitializedProvenanceEvent.java         | 367 ++++++++++++
 .../journaling/ProgressAwareIterator.java       |  25 +
 .../exception/EventNotFoundException.java       |  29 +
 .../journaling/index/EventIndexSearcher.java    |  20 +
 .../journaling/index/IndexManager.java          |  18 +
 .../journaling/index/LuceneIndexManager.java    |  47 +-
 .../journaling/index/LuceneIndexSearcher.java   | 128 ++++-
 .../journaling/index/LuceneIndexWriter.java     |  51 +-
 .../journaling/index/MultiIndexSearcher.java    |  28 +
 .../provenance/journaling/index/QueryUtils.java |  13 +
 .../journaling/query/QueryManager.java          |  14 +
 .../journaling/query/StandardQueryManager.java  |  39 +-
 .../TestJournalingProvenanceRepository.java     | 307 +++++++++-
 .../nifi/provenance/journaling/TestUtil.java    |   2 +-
 .../journaling/index/TestEventIndexWriter.java  |   2 +-
 80 files changed, 5710 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRepository.java
 
b/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRepository.java
index 8c7a044..18e5788 100644
--- 
a/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRepository.java
+++ 
b/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRepository.java
@@ -190,7 +190,7 @@ public interface ProvenanceEventRepository extends 
Closeable {
 
     /**
      * Returns a list of all fields that can be searched via the
-     * {@link #submitQuery(nifi.provenance.search.Query)} method
+     * {@link #submitQuery(Query.provenance.search.Query)} method
      *
      * @return
      */
@@ -198,7 +198,7 @@ public interface ProvenanceEventRepository extends 
Closeable {
 
     /**
      * Returns a list of all FlowFile attributes that can be searched via the
-     * {@link #submitQuery(nifi.provenance.search.Query)} method
+     * {@link #submitQuery(Query.provenance.search.Query)} method
      *
      * @return
      */

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/query/ProvenanceQueryResult.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/query/ProvenanceQueryResult.java
 
b/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/query/ProvenanceQueryResult.java
new file mode 100644
index 0000000..cbe2705
--- /dev/null
+++ 
b/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/query/ProvenanceQueryResult.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.query;
+
+import java.util.Date;
+
+public interface ProvenanceQueryResult {
+    /**
+     * Returns the Provenance events that match the query (up to the limit
+     * specified in the query)
+     *
+     * @return
+     */
+    ProvenanceResultSet getResultSet();
+
+
+    /**
+     * Returns the date at which this QueryResult will expire
+     *
+     * @return
+     */
+    Date getExpiration();
+
+    /**
+     * If an error occurred while computing the lineage, this will return the
+     * serialized error; otherwise, returns <code>null</code>.
+     *
+     * @return
+     */
+    String getError();
+
+    /**
+     * returns an integer between 0 and 100 (inclusive) that indicates what
+     * percentage of completion the query has reached
+     *
+     * @return
+     */
+    int getPercentComplete();
+
+    /**
+     * Indicates whether or not the query has finished running
+     *
+     * @return
+     */
+    boolean isFinished();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/query/ProvenanceQuerySubmission.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/query/ProvenanceQuerySubmission.java
 
b/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/query/ProvenanceQuerySubmission.java
new file mode 100644
index 0000000..e4a9edc
--- /dev/null
+++ 
b/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/query/ProvenanceQuerySubmission.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.query;
+
+import java.util.Date;
+
+import org.apache.nifi.provenance.search.QueryResult;
+
+public interface ProvenanceQuerySubmission {
+
+    /**
+     * Returns the query that was submitted
+     * @return
+     */
+    String getQuery();
+    
+    /**
+     * Returns the {@link QueryResult} for this query. Note that the result is
+     * only a partial result if the result of calling
+     * {@link QueryResult#isFinished()} is <code>false</code>.
+     *
+     * @return
+     */
+    ProvenanceQueryResult getResult();
+
+    /**
+     * Returns the date at which this query was submitted
+     *
+     * @return
+     */
+    Date getSubmissionTime();
+
+    /**
+     * Returns the generated identifier for this query result
+     *
+     * @return
+     */
+    String getQueryIdentifier();
+
+    /**
+     * Cancels the query
+     */
+    void cancel();
+
+    /**
+     * @return <code>true</code> if {@link #cancel()} has been called,
+     * <code>false</code> otherwise
+     */
+    boolean isCanceled();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/query/ProvenanceResultSet.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/query/ProvenanceResultSet.java
 
b/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/query/ProvenanceResultSet.java
new file mode 100644
index 0000000..37c3d90
--- /dev/null
+++ 
b/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/query/ProvenanceResultSet.java
@@ -0,0 +1,36 @@
+package org.apache.nifi.provenance.query;
+
+import java.util.List;
+
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+
+
+/**
+ * Represents a set of results from issuing a query against a {@link 
ProvenanceEventRepository}.
+ */
+public interface ProvenanceResultSet {
+    /**
+     * Returns the labels for the columns (aka column headers)
+     * @return
+     */
+       List<String> getLabels();
+       
+       /**
+        * Returns the types of the columns returned for each row
+        * @return
+        */
+       List<Class<?>> getReturnType();
+       
+       /**
+        * Indicates whether or not another result exists in the result set.
+        * @return
+        */
+       boolean hasNext();
+       
+       /**
+        * Returns the next result for this query
+        * @return
+        */
+       List<?> next();
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/.gitignore
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-provenance-query-language/.gitignore 
b/nifi/nifi-commons/nifi-provenance-query-language/.gitignore
new file mode 100644
index 0000000..b83d222
--- /dev/null
+++ b/nifi/nifi-commons/nifi-provenance-query-language/.gitignore
@@ -0,0 +1 @@
+/target/

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-provenance-query-language/pom.xml 
b/nifi/nifi-commons/nifi-provenance-query-language/pom.xml
new file mode 100644
index 0000000..a13fd16
--- /dev/null
+++ b/nifi/nifi-commons/nifi-provenance-query-language/pom.xml
@@ -0,0 +1,65 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.nifi</groupId>
+               <artifactId>nifi-commons</artifactId>
+               <version>0.0.2-incubating-SNAPSHOT</version>
+       </parent>
+ 
+       <artifactId>nifi-provenance-query-language</artifactId>
+       <packaging>jar</packaging>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.antlr</groupId>
+                               <artifactId>antlr3-maven-plugin</artifactId>
+                               <version>3.4</version>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>antlr</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-api</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.antlr</groupId>
+                       <artifactId>antlr-runtime</artifactId>
+               </dependency>
+               
+               <dependency>
+                       <groupId>org.apache.lucene</groupId>
+                       <artifactId>lucene-core</artifactId>
+               </dependency>
+               
+               <dependency>
+                       <groupId>junit</groupId>
+                       <artifactId>junit</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       
<artifactId>nifi-volatile-provenance-repository</artifactId>
+                       <version>0.0.2-incubating-SNAPSHOT</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-properties</artifactId>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/antlr3/org/apache/nifi/pql/ProvenanceQueryLexer.g
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/antlr3/org/apache/nifi/pql/ProvenanceQueryLexer.g
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/antlr3/org/apache/nifi/pql/ProvenanceQueryLexer.g
new file mode 100644
index 0000000..0815475
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/antlr3/org/apache/nifi/pql/ProvenanceQueryLexer.g
@@ -0,0 +1,193 @@
+lexer grammar ProvenanceQueryLexer;
+
+@header {
+       package org.apache.nifi.pql;
+       import 
org.apache.nifi.pql.exception.ProvenanceQueryLanguageParsingException;
+}
+
+@rulecatch {
+  catch(final Exception e) {
+    throw new ProvenanceQueryLanguageParsingException(e);
+  }
+}
+
+@members {
+  public void displayRecognitionError(String[] tokenNames, 
RecognitionException e) {
+    final StringBuilder sb = new StringBuilder();
+    if ( e.token == null ) {
+       sb.append("Unrecognized token ");
+    } else {
+       sb.append("Unexpected token '").append(e.token.getText()).append("' ");
+    }
+    sb.append("at line ").append(e.line);
+    if ( e.approximateLineInfo ) {
+       sb.append(" (approximately)");
+    }
+    sb.append(", column ").append(e.charPositionInLine);
+    sb.append(". Query: ").append(e.input.toString());
+    
+    throw new ProvenanceQueryLanguageParsingException(sb.toString());
+  }
+
+  public void recover(RecognitionException e) {
+       final StringBuilder sb = new StringBuilder();
+    if ( e.token == null ) {
+       sb.append("Unrecognized token ");
+    } else {
+       sb.append("Unexpected token '").append(e.token.getText()).append("' ");
+    }
+    sb.append("at line ").append(e.line);
+    if ( e.approximateLineInfo ) {
+       sb.append(" (approximately)");
+    }
+    sb.append(", column ").append(e.charPositionInLine);
+    sb.append(". Query: ").append(e.input.toString());
+    
+    throw new ProvenanceQueryLanguageParsingException(sb.toString());
+  } 
+}
+
+
+// PUNCTUATION & SPECIAL CHARACTERS
+WHITESPACE : (' '|'\t'|'\n'|'\r')+ { $channel = HIDDEN; };
+COMMENT : '#' ( ~('\n') )* '\n' { $channel = HIDDEN; };
+
+LPAREN : '(';
+RPAREN : ')';
+LBRACKET : '[';
+RBRACKET : ']';
+COMMA  : ',';
+DOT            : '.';
+SEMICOLON : ';';
+ASTERISK : '*';
+EQUALS : '=';
+NOT_EQUALS : '!=' | '<>';
+GT : '>';
+LT : '<';
+GE : '>=';
+LE : '<=';
+PIPE : '|';
+NUMBER : ('0'..'9')+;
+
+
+// Keywords
+SELECT : 'SELECT' | 'select' | 'Select';
+AS : 'AS' | 'as' | 'As';
+FROM : 'FROM' | 'from' | 'From';
+WHERE : 'WHERE' | 'where' | 'Where';
+HAVING : 'HAVING' | 'having' | 'Having';
+ORDER_BY : 'ORDER BY' | 'order by' | 'Order By';
+ASC : 'ASC' | 'asc' | 'Asc';
+DESC : 'DESC' | 'desc' | 'Desc';
+GROUP_BY : 'GROUP BY' | 'group by' | 'Group By';
+EVENT : 'EVENT' | 'event' | 'Event';
+RELATIONSHIP : 'RELATIONSHIP' | 'relationship' | 'Relationship';
+
+
+// Operators
+WITHIN : 'WITHIN' | 'within' | 'Within';
+MATCHES : 'MATCHES' | 'matches' | 'Matches';
+CONTAINS : 'CONTAINS' | 'contains' | 'Contains';
+IS_NULL : 'IS NULL' | 'is null' | 'Is Null';
+NOT_NULL : 'NOT NULL' | 'not null' | 'Not Null';
+IN : 'IN' | 'in' | 'In';
+BETWEEN : 'BETWEEN' | 'between' | 'Between';
+AND : 'AND' | 'and' | 'And';
+OR : 'OR' | 'or' | 'Or';
+NOT : 'NOT' | 'not' | 'Not';
+LIMIT : 'LIMIT' | 'limit' | 'Limit';
+STARTS_WITH : 'STARTS WITH' | 'starts with' | 'Starts with' | 'Starts With';
+
+// Functions
+COUNT : 'COUNT' | 'count' | 'Count';
+SUM : 'SUM' | 'sum' | 'Sum';
+MIN : 'MIN' | 'min' | 'Min';
+MAX : 'MAX' | 'max' | 'Max';
+AVG : 'AVG' | 'avg' | 'Avg';
+HOUR : 'HOUR' | 'hour' | 'Hour';
+MINUTE : 'MINUTE' | 'minute' | 'Minute';
+SECOND : 'SECOND' | 'second' | 'Second';
+DAY : 'DAY' | 'day' | 'Day';
+MONTH : 'MONTH' | 'month' | 'Month';
+YEAR : 'YEAR' | 'year' | 'Year';
+
+
+// Event Properties
+TRANSIT_URI : 'TRANSITURI' | 'transituri' | 'TransitUri';
+TIMESTAMP : 'TIME' | 'time' | 'Time';
+FILESIZE : 'SIZE' | 'size' | 'Size';
+TYPE : 'TYPE' | 'type' | 'Type';
+COMPONENT_ID : 'COMPONENTID' | 'componentid' | 'ComponentId' | 'componentId' | 
'componentID' | 'ComponentID';
+UUID : 'UUID' | 'uuid' | 'Uuid';
+
+// Event Types
+RECEIVE : 'RECEIVE' | 'receive' | 'Receive';
+SEND : 'SEND' | 'send' | 'Send';
+DROP : 'DROP' | 'drop' | 'Drop';
+CREATE : 'CREATE' | 'create' | 'Create';
+EXPIRE : 'EXPIRE' | 'expire' | 'Expire';
+FORK : 'FORK' | 'fork' | 'Fork';
+JOIN : 'JOIN' | 'join' | 'Join';
+CLONE : 'CLONE' | 'clone' | 'Clone';
+CONTENT_MODIFIED : 'CONTENT_MODIFIED' | 'content_modified' | 
'Content_Modified';
+ATTRIBUTES_MODIFIED : 'ATTRIBUTES_MODIFIED' | 'attributes_modified' | 
'Attributes_Modified';
+ROUTE : 'ROUTE' | 'route' | 'Route';
+REPLAY : 'REPLAY' | 'replay' | 'Replay';
+
+
+
+IDENTIFIER : (
+                                       ('a'..'z' | 'A'..'Z' | '$')
+                                       ('a'..'z' | 'A'..'Z' | '$' | '0'..'9' | 
'_' )*
+                       );
+
+
+
+// STRINGS
+STRING_LITERAL
+@init{StringBuilder lBuf = new StringBuilder();}
+       :
+               (
+                       '"'
+                               (
+                                       escaped=ESC {lBuf.append(getText());} |
+                                       normal = ~( '"' | '\\' | '\n' | '\r' | 
'\t' ) { lBuf.appendCodePoint(normal);} 
+                               )*
+                       '"'
+               )
+               {
+                       setText(lBuf.toString());
+               }
+               |
+               (
+                       '\''
+                               (
+                                       escaped=ESC {lBuf.append(getText());} |
+                                       normal = ~( '\'' | '\\' | '\n' | '\r' | 
'\t' ) { lBuf.appendCodePoint(normal);} 
+                               )*
+                       '\''
+               )
+               {
+                       setText(lBuf.toString());
+               }
+               ;
+
+
+fragment
+ESC
+       :       '\\'
+               (
+                               '"'             { setText("\""); }
+                       |       '\''    { setText("\'"); }
+                       |       'r'             { setText("\r"); }
+                       |       'n'             { setText("\n"); }
+                       |       't'             { setText("\t"); }
+                       |       '\\'    { setText("\\\\"); }
+                       |       nextChar = ~('"' | '\'' | 'r' | 'n' | 't' | 
'\\')               
+                               {
+                                       StringBuilder lBuf = new 
StringBuilder(); lBuf.append("\\\\").appendCodePoint(nextChar); 
setText(lBuf.toString());
+                               }
+               )
+       ;
+       
+

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/antlr3/org/apache/nifi/pql/ProvenanceQueryParser.g
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/antlr3/org/apache/nifi/pql/ProvenanceQueryParser.g
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/antlr3/org/apache/nifi/pql/ProvenanceQueryParser.g
new file mode 100644
index 0000000..3837729
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/antlr3/org/apache/nifi/pql/ProvenanceQueryParser.g
@@ -0,0 +1,141 @@
+parser grammar ProvenanceQueryParser;
+
+options {
+       output=AST;
+       tokenVocab=ProvenanceQueryLexer;
+}
+
+tokens {
+       PQL;
+       QUERY;
+       EVENT_PROPERTY;
+       ATTRIBUTE;
+       ORDER;
+}
+
+@header {
+       package org.apache.nifi.pql;
+       import 
org.apache.nifi.pql.exception.ProvenanceQueryLanguageParsingException;
+}
+
+
+@members {
+  public void displayRecognitionError(String[] tokenNames, 
RecognitionException e) {
+       final StringBuilder sb = new StringBuilder();
+    if ( e.token == null ) {
+       sb.append("Unrecognized token ");
+    } else {
+       sb.append("Unexpected token '").append(e.token.getText()).append("' ");
+    }
+    sb.append("at line ").append(e.line);
+    if ( e.approximateLineInfo ) {
+       sb.append(" (approximately)");
+    }
+    sb.append(", column ").append(e.charPositionInLine);
+    sb.append(". Query: ").append(e.input.toString());
+    
+    throw new ProvenanceQueryLanguageParsingException(sb.toString());
+  }
+
+  public void recover(final RecognitionException e) {
+       final StringBuilder sb = new StringBuilder();
+    if ( e.token == null ) {
+       sb.append("Unrecognized token ");
+    } else {
+       sb.append("Unexpected token '").append(e.token.getText()).append("' ");
+    }
+    sb.append("at line ").append(e.line);
+    if ( e.approximateLineInfo ) {
+       sb.append(" (approximately)");
+    }
+    sb.append(", column ").append(e.charPositionInLine);
+    sb.append(". Query: ").append(e.input.toString());
+    
+    throw new ProvenanceQueryLanguageParsingException(sb.toString());
+  } 
+}
+
+
+
+pql : query ->
+       ^(PQL query);
+
+query : selectClause
+               fromClause?
+               whereClause?
+               groupByClause?
+//             havingClause?
+               orderByClause?
+               limitClause?
+               SEMICOLON?
+               EOF ->
+       ^(QUERY selectClause fromClause? whereClause? groupByClause? 
orderByClause? limitClause?);
+
+
+selectClause : SELECT^ selectable (COMMA! selectable)*;
+
+
+selectable : function | (selectableSource ( (DOT! eventProperty^) | (LBRACKET! 
attribute^ RBRACKET!) )?);
+
+selectableSource : EVENT | IDENTIFIER;
+
+eventProperty : propertyName ->
+       ^(EVENT_PROPERTY propertyName );
+
+propertyName : UUID | TRANSIT_URI | TIMESTAMP | FILESIZE | TYPE | COMPONENT_ID 
| RELATIONSHIP;
+
+attribute : STRING_LITERAL ->
+       ^(ATTRIBUTE STRING_LITERAL);
+
+
+fromClause : FROM^ source (COMMA! source)*;
+
+source : RECEIVE | SEND | DROP | CREATE | EXPIRE | FORK | JOIN | CLONE | 
CONTENT_MODIFIED | ATTRIBUTES_MODIFIED | ROUTE | REPLAY | ASTERISK;
+
+
+
+whereClause : WHERE^ conditions;
+
+conditions : condition ((AND^ | OR^) condition)*;
+
+condition : NOT^ condition | LPAREN! conditions RPAREN! | evaluation;
+
+evaluation : expression 
+                       (
+                               unaryOperator^ 
+                               | (binaryOperator^ expression)
+                               | (BETWEEN^ NUMBER AND! NUMBER)
+                       );
+
+expression : (LPAREN! expr RPAREN!) | expr;
+
+expr : constant | ref;
+
+ref : selectableSource ( (DOT! eventProperty^) | (LBRACKET! attribute^ 
RBRACKET!) )?;
+
+unaryOperator : IS_NULL | NOT_NULL;
+
+binaryOperator : EQUALS | NOT_EQUALS | GT | LT | GE | LE | MATCHES | 
STARTS_WITH;
+
+constant : NUMBER | STRING_LITERAL;
+
+
+function : functionName^ LPAREN! ref RPAREN!;
+
+functionName : COUNT | SUM | MIN | MAX | AVG | HOUR | MINUTE | SECOND | DAY | 
MONTH | YEAR;
+
+
+groupByClause : GROUP_BY^ group (COMMA! group)*;
+
+group : ref^ | function^;
+
+orderByClause : ORDER_BY^ order (COMMA! order)*;
+
+order: selectable direction? ->
+       ^(ORDER selectable direction?);
+
+direction : ASC | DESC;
+
+limitClause : LIMIT^ NUMBER;
+
+

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/Formatter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/Formatter.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/Formatter.java
new file mode 100644
index 0000000..7090d26
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/Formatter.java
@@ -0,0 +1,5 @@
+package org.apache.nifi.pql;
+
+public interface Formatter {
+       String format(Object value);
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/LuceneTranslator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/LuceneTranslator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/LuceneTranslator.java
new file mode 100644
index 0000000..96b4dbf
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/LuceneTranslator.java
@@ -0,0 +1,189 @@
+package org.apache.nifi.pql;
+
+import static org.apache.nifi.pql.ProvenanceQueryParser.*;
+
+import java.text.SimpleDateFormat;
+import java.util.regex.Pattern;
+
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.BooleanClause.Occur;
+import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.NumericRangeQuery;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.RegexpQuery;
+import org.apache.lucene.search.TermQuery;
+import org.apache.nifi.pql.evaluation.OperandEvaluator;
+import org.apache.nifi.pql.evaluation.RecordEvaluator;
+import org.apache.nifi.pql.evaluation.comparison.EqualsEvaluator;
+import org.apache.nifi.pql.evaluation.comparison.GreaterThanEvaluator;
+import org.apache.nifi.pql.evaluation.comparison.LessThanEvaluator;
+import org.apache.nifi.pql.evaluation.comparison.MatchesEvaluator;
+import org.apache.nifi.pql.evaluation.comparison.StartsWithEvaluator;
+import org.apache.nifi.pql.evaluation.extraction.AttributeEvaluator;
+import org.apache.nifi.pql.evaluation.literals.LongLiteralEvaluator;
+import org.apache.nifi.pql.evaluation.literals.StringLiteralEvaluator;
+import org.apache.nifi.pql.evaluation.logic.AndEvaluator;
+import org.apache.nifi.pql.evaluation.logic.OrEvaluator;
+import org.apache.nifi.provenance.SearchableFields;
+
+
+public class LuceneTranslator {
+
+       public static Query toLuceneQuery(final RecordEvaluator<Boolean> 
whereClause) {
+           if ( whereClause == null ) {
+               return new MatchAllDocsQuery();
+           }
+           
+               final BooleanQuery query = new BooleanQuery();
+               switch (whereClause.getEvaluatorType()) {
+                       case AND:
+                               final AndEvaluator and = (AndEvaluator) 
whereClause;
+                               query.add(toLuceneQuery(and.getLHS()), 
Occur.MUST);
+                               query.add(toLuceneQuery(and.getRHS()), 
Occur.MUST);
+                               break;
+                       case OR:
+                               final OrEvaluator or = (OrEvaluator) 
whereClause;
+                               query.add(toLuceneQuery(or.getLHS()), 
Occur.SHOULD);
+                               query.add(toLuceneQuery(or.getRHS()), 
Occur.SHOULD);
+                               query.setMinimumNumberShouldMatch(1);
+                               break;
+                       case GT: {
+                                       final GreaterThanEvaluator gt = 
(GreaterThanEvaluator) whereClause;
+                                       final OperandEvaluator<?> lhs = 
gt.getLHS();
+                                       final OperandEvaluator<?> rhs = 
gt.getRHS();
+                                       
+                                       final String fieldName = 
getFieldName(lhs);
+                                       if ( fieldName != null ) {
+                                               Long rhsValue = null;
+                                               if ( rhs.getEvaluatorType() == 
NUMBER ) {
+                                                       rhsValue = 
((LongLiteralEvaluator) rhs).evaluate(null);
+                                               } else if ( 
rhs.getEvaluatorType() == STRING_LITERAL ) {
+                                                       final SimpleDateFormat 
sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+                                                       try {
+                                                               rhsValue = 
sdf.parse(((StringLiteralEvaluator) rhs).evaluate(null)).getTime();
+                                                       } catch (final 
Exception e) {
+                                                       }
+                                               }
+                                               
+                                               if ( rhsValue != null ) {
+                                                       
query.add(NumericRangeQuery.newLongRange(fieldName, rhsValue, Long.MAX_VALUE, 
true, true), Occur.MUST);
+                                               }
+                                       }
+                               }
+                               break;
+                       case LT: {
+                                       final LessThanEvaluator lt = 
(LessThanEvaluator) whereClause;
+                                       final OperandEvaluator<?> lhs = 
lt.getLHS();
+                                       final OperandEvaluator<?> rhs = 
lt.getRHS();
+                                       
+                                       final String fieldName = 
getFieldName(lhs);
+                                       if ( fieldName != null ) {
+                                               Long rhsValue = null;
+                                               if ( rhs.getEvaluatorType() == 
NUMBER ) {
+                                                       rhsValue = 
((LongLiteralEvaluator) rhs).evaluate(null);
+                                               } else if ( 
rhs.getEvaluatorType() == STRING_LITERAL ) {
+                                                       final SimpleDateFormat 
sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+                                                       try {
+                                                               rhsValue = 
sdf.parse(((StringLiteralEvaluator) rhs).evaluate(null)).getTime();
+                                                       } catch (final 
Exception e) {
+                                                       }
+                                               }
+                                               
+                                               if ( rhsValue != null ) {
+                                                       
query.add(NumericRangeQuery.newLongRange(fieldName, Long.MIN_VALUE, rhsValue, 
true, true), Occur.MUST);
+                                               }
+                                       }
+                               }
+                               break;
+                       case MATCHES: {
+                                       final MatchesEvaluator me = 
(MatchesEvaluator) whereClause;
+                                       final OperandEvaluator<?> lhs = 
me.getLHS();
+                                       final OperandEvaluator<?> rhs = 
me.getRHS();
+                                       addMatches(lhs, rhs, query);
+                               }
+                               break;
+                       case STARTS_WITH: {
+                                       final StartsWithEvaluator startsWith = 
(StartsWithEvaluator) whereClause;
+                                       final OperandEvaluator<?> lhs = 
startsWith.getLHS();
+                                       final OperandEvaluator<?> rhs = 
startsWith.getRHS();
+                                       
+                                       if ( rhs.getEvaluatorType() == 
STRING_LITERAL ) {
+                                               final String base = 
rhs.evaluate(null).toString();
+                                               
+                                               final StringLiteralEvaluator 
regexEval = new StringLiteralEvaluator(Pattern.quote(base) + ".*");
+                                               addMatches(lhs, regexEval, 
query);
+                                       }
+                               }
+                               break;
+                       case EQUALS: {
+                                       final EqualsEvaluator equals = 
(EqualsEvaluator) whereClause;
+                                       final OperandEvaluator<?> lhs = 
equals.getLHS();
+                                       final OperandEvaluator<?> rhs = 
equals.getRHS();
+                                       
+                                       final String fieldName = 
getFieldName(lhs);
+                                       if ( fieldName != null && 
rhs.getEvaluatorType() == STRING_LITERAL ) {
+                                               query.add(new TermQuery(new 
Term(fieldName, toLower(rhs.evaluate(null).toString()))), Occur.MUST);
+                                       }
+                               }
+                               break;
+               }
+               
+               return query;
+       }
+       
+       private static String toLower(final String value) {
+               return value == null ? null : value.toLowerCase();
+       }
+       
+       private static String getFieldName(final OperandEvaluator<?> eval) {
+               switch (eval.getEvaluatorType()) {
+                       case TIMESTAMP:
+                               return 
SearchableFields.EventTime.getSearchableFieldName();
+                       case FILESIZE:
+                               return 
SearchableFields.FileSize.getSearchableFieldName();
+                       case ATTRIBUTE:
+                               return ((AttributeEvaluator) 
eval).getAttributeNameEvaluator().evaluate(null).toLowerCase();
+                       case TRANSIT_URI:
+                               return 
SearchableFields.TransitURI.getSearchableFieldName();
+                       case RELATIONSHIP:
+                               return 
SearchableFields.Relationship.getSearchableFieldName();
+                       case TYPE:
+                               return 
SearchableFields.EventType.getSearchableFieldName();
+                       case COMPONENT_ID:
+                               return 
SearchableFields.ComponentID.getSearchableFieldName();
+                       case UUID:
+                           return 
SearchableFields.FlowFileUUID.getSearchableFieldName();
+               }
+               
+               return null;
+       }
+       
+       private static void addMatches(final OperandEvaluator<?> lhs, final 
OperandEvaluator<?> rhs, final BooleanQuery query) {
+               String field = null;
+               switch (lhs.getEvaluatorType()) {
+                       case ATTRIBUTE:
+                               final AttributeEvaluator attr = 
(AttributeEvaluator) lhs;
+                               final OperandEvaluator<?> attrEval = 
attr.getAttributeNameEvaluator();
+                               if ( attrEval.getEvaluatorType() == 
STRING_LITERAL ) {
+                                       field = (String) 
attrEval.evaluate(null);
+                               }
+                               break;
+                       case COMPONENT_ID:
+                       case TRANSIT_URI:
+                       case TYPE:
+                               field = lhs.evaluate(null).toString();
+                               break;
+               }
+               
+               String regex = null;
+               if ( rhs.getEvaluatorType() == STRING_LITERAL ) {
+                       regex = rhs.evaluate(null).toString();
+               }
+               
+               if ( field != null && regex != null ) {
+                       query.add(new RegexpQuery(new Term(field, regex)), 
Occur.MUST);
+               }
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/ProvenanceQuery.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/ProvenanceQuery.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/ProvenanceQuery.java
new file mode 100644
index 0000000..200a8ee
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/ProvenanceQuery.java
@@ -0,0 +1,574 @@
+package org.apache.nifi.pql;
+
+import static org.apache.nifi.pql.ProvenanceQueryParser.AND;
+import static org.apache.nifi.pql.ProvenanceQueryParser.ASC;
+import static org.apache.nifi.pql.ProvenanceQueryParser.ATTRIBUTE;
+import static org.apache.nifi.pql.ProvenanceQueryParser.AVG;
+import static org.apache.nifi.pql.ProvenanceQueryParser.COMPONENT_ID;
+import static org.apache.nifi.pql.ProvenanceQueryParser.COUNT;
+import static org.apache.nifi.pql.ProvenanceQueryParser.DAY;
+import static org.apache.nifi.pql.ProvenanceQueryParser.EQUALS;
+import static org.apache.nifi.pql.ProvenanceQueryParser.EVENT;
+import static org.apache.nifi.pql.ProvenanceQueryParser.EVENT_PROPERTY;
+import static org.apache.nifi.pql.ProvenanceQueryParser.FILESIZE;
+import static org.apache.nifi.pql.ProvenanceQueryParser.FROM;
+import static org.apache.nifi.pql.ProvenanceQueryParser.GROUP_BY;
+import static org.apache.nifi.pql.ProvenanceQueryParser.GT;
+import static org.apache.nifi.pql.ProvenanceQueryParser.HOUR;
+import static org.apache.nifi.pql.ProvenanceQueryParser.IDENTIFIER;
+import static org.apache.nifi.pql.ProvenanceQueryParser.LIMIT;
+import static org.apache.nifi.pql.ProvenanceQueryParser.LT;
+import static org.apache.nifi.pql.ProvenanceQueryParser.MATCHES;
+import static org.apache.nifi.pql.ProvenanceQueryParser.MINUTE;
+import static org.apache.nifi.pql.ProvenanceQueryParser.NOT;
+import static org.apache.nifi.pql.ProvenanceQueryParser.NOT_EQUALS;
+import static org.apache.nifi.pql.ProvenanceQueryParser.NUMBER;
+import static org.apache.nifi.pql.ProvenanceQueryParser.OR;
+import static org.apache.nifi.pql.ProvenanceQueryParser.ORDER_BY;
+import static org.apache.nifi.pql.ProvenanceQueryParser.RELATIONSHIP;
+import static org.apache.nifi.pql.ProvenanceQueryParser.SECOND;
+import static org.apache.nifi.pql.ProvenanceQueryParser.STARTS_WITH;
+import static org.apache.nifi.pql.ProvenanceQueryParser.STRING_LITERAL;
+import static org.apache.nifi.pql.ProvenanceQueryParser.SUM;
+import static org.apache.nifi.pql.ProvenanceQueryParser.TIMESTAMP;
+import static org.apache.nifi.pql.ProvenanceQueryParser.TRANSIT_URI;
+import static org.apache.nifi.pql.ProvenanceQueryParser.TYPE;
+import static org.apache.nifi.pql.ProvenanceQueryParser.UUID;
+import static org.apache.nifi.pql.ProvenanceQueryParser.WHERE;
+import static org.apache.nifi.pql.ProvenanceQueryParser.YEAR;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.antlr.runtime.ANTLRStringStream;
+import org.antlr.runtime.CharStream;
+import org.antlr.runtime.CommonTokenStream;
+import org.antlr.runtime.tree.Tree;
+import org.apache.nifi.pql.evaluation.Accumulator;
+import org.apache.nifi.pql.evaluation.BooleanEvaluator;
+import org.apache.nifi.pql.evaluation.OperandEvaluator;
+import org.apache.nifi.pql.evaluation.RecordEvaluator;
+import org.apache.nifi.pql.evaluation.RepositoryEvaluator;
+import org.apache.nifi.pql.evaluation.accumulation.AverageAccumulator;
+import org.apache.nifi.pql.evaluation.accumulation.CountAccumulator;
+import org.apache.nifi.pql.evaluation.accumulation.EventAccumulator;
+import org.apache.nifi.pql.evaluation.accumulation.SumAccumulator;
+import org.apache.nifi.pql.evaluation.comparison.EqualsEvaluator;
+import org.apache.nifi.pql.evaluation.comparison.GreaterThanEvaluator;
+import org.apache.nifi.pql.evaluation.comparison.LessThanEvaluator;
+import org.apache.nifi.pql.evaluation.comparison.MatchesEvaluator;
+import org.apache.nifi.pql.evaluation.comparison.RecordTypeEvaluator;
+import org.apache.nifi.pql.evaluation.comparison.StartsWithEvaluator;
+import org.apache.nifi.pql.evaluation.extraction.AttributeEvaluator;
+import org.apache.nifi.pql.evaluation.extraction.ComponentIdEvaluator;
+import org.apache.nifi.pql.evaluation.extraction.RelationshipEvaluator;
+import org.apache.nifi.pql.evaluation.extraction.SizeEvaluator;
+import org.apache.nifi.pql.evaluation.extraction.TimestampEvaluator;
+import org.apache.nifi.pql.evaluation.extraction.TransitUriEvaluator;
+import org.apache.nifi.pql.evaluation.extraction.TypeEvaluator;
+import org.apache.nifi.pql.evaluation.extraction.UuidEvaluator;
+import org.apache.nifi.pql.evaluation.function.TimeFieldEvaluator;
+import org.apache.nifi.pql.evaluation.literals.LongLiteralEvaluator;
+import org.apache.nifi.pql.evaluation.literals.StringLiteralEvaluator;
+import org.apache.nifi.pql.evaluation.logic.AndEvaluator;
+import org.apache.nifi.pql.evaluation.logic.OrEvaluator;
+import org.apache.nifi.pql.evaluation.order.FieldSorter;
+import org.apache.nifi.pql.evaluation.order.GroupedSorter;
+import org.apache.nifi.pql.evaluation.order.RowSorter;
+import org.apache.nifi.pql.evaluation.order.SortDirection;
+import org.apache.nifi.pql.evaluation.repository.SelectAllRecords;
+import org.apache.nifi.pql.exception.ProvenanceQueryLanguageParsingException;
+import org.apache.nifi.pql.results.GroupingResultSet;
+import org.apache.nifi.pql.results.StandardOrderedResultSet;
+import org.apache.nifi.pql.results.StandardUnorderedResultSet;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.StoredProvenanceEvent;
+import org.apache.nifi.provenance.query.ProvenanceResultSet;
+
+public class ProvenanceQuery {
+       private final Tree tree;
+       private final String pql;
+       private final List<Accumulator<?>> selectAccumulators;
+       private final List<RecordEvaluator<?>> groupEvaluators;
+       private final RecordEvaluator<Boolean> sourceEvaluator;
+       private final RecordEvaluator<Boolean> conditionEvaluator;
+       private final RowSorter sorter;
+       private final Long limit;
+       
+       private long accumulatorIdGenerator = 0L;
+       
+       public static ProvenanceQuery compile(final String pql) {
+               try {
+            final CommonTokenStream lexerTokenStream = createTokenStream(pql);
+            final ProvenanceQueryParser parser = new 
ProvenanceQueryParser(lexerTokenStream);
+            final Tree ast = (Tree) parser.pql().getTree();
+            final Tree tree = ast.getChild(0);
+            
+            return new ProvenanceQuery(tree, pql);
+        } catch (final ProvenanceQueryLanguageParsingException e) {
+            throw e;
+        } catch (final Exception e) {
+            throw new ProvenanceQueryLanguageParsingException(e);
+        }
+       }
+       
+       private static CommonTokenStream createTokenStream(final String 
expression) throws ProvenanceQueryLanguageParsingException {
+        final CharStream input = new ANTLRStringStream(expression);
+        final ProvenanceQueryLexer lexer = new ProvenanceQueryLexer(input);
+        return new CommonTokenStream(lexer);
+    }
+       
+       private ProvenanceQuery(final Tree tree, final String pql) {
+               this.tree = tree;
+               this.pql = pql;
+               
+               Tree fromTree = null;
+               Tree whereTree = null;
+               Tree groupByTree = null;
+               Tree limitTree = null;
+               Tree orderByTree = null;
+               
+               for (int i=1; i < tree.getChildCount(); i++) {
+                       final Tree subTree = tree.getChild(i);
+                       switch (subTree.getType()) {
+                               case FROM:
+                                       fromTree = subTree;
+                                       break;
+                               case WHERE:
+                                       whereTree = subTree;
+                                       break;
+                               case GROUP_BY:
+                                       groupByTree = subTree;
+                                       break;
+                               case LIMIT:     
+                                       limitTree = subTree;
+                                       break;
+                               case ORDER_BY:
+                                       orderByTree = subTree;
+                                       break;
+                               default:
+                                       // TODO: Handle other types!
+                                       continue;
+                       }
+               }
+
+               sourceEvaluator = (fromTree == null) ? null : 
buildSourceEvaluator(fromTree);
+               
+               // Distribute AND's over OR's.
+               final BooleanEvaluator where = (whereTree == null) ? null : 
buildConditionEvaluator(whereTree.getChild(0));
+               conditionEvaluator = where;
+               
+               groupEvaluators = (groupByTree == null) ? null : 
buildGroupEvaluators(groupByTree);
+               limit = (limitTree == null) ? null : 
Long.parseLong(limitTree.getChild(0).getText());
+               sorter = (orderByTree == null) ? null : 
buildSorter(orderByTree, groupByTree != null);
+               
+               boolean requiresAggregate = false;
+               if ( groupEvaluators != null && !groupEvaluators.isEmpty() ) {
+                       requiresAggregate = true;
+               }
+               if ( requiresAggregate ) {
+                       selectAccumulators = 
buildAccumulators(tree.getChild(0), true);
+               } else {
+                       final List<Accumulator<?>> accumulators = 
buildAccumulators(tree.getChild(0), false);
+                       
+                       for ( final Accumulator<?> accumulator : accumulators ) 
{
+                               if ( accumulator.isAggregateFunction() ) {
+                                       requiresAggregate = true;
+                                       break;
+                               }
+                       }
+                       
+                       if ( requiresAggregate ) {
+                               selectAccumulators = 
buildAccumulators(tree.getChild(0), true);
+                       } else {
+                               selectAccumulators = accumulators;
+                       }
+               }
+               
+               
+               //repoEvaluator = new SelectAllRecords();
+               //repoEvaluator = new 
SelectFromLucene(LuceneTranslator.toLuceneQuery(where));
+       }
+       
+       @Override
+       public String toString() {
+               return printTree(tree);
+       }
+       
+       public String getQuery() {
+               return pql;
+               
+       }
+       
+       private String printTree(final Tree tree) {
+        final StringBuilder sb = new StringBuilder();
+        printTree(tree, 0, sb);
+        
+        return sb.toString();
+    }
+    
+    private void printTree(final Tree tree, final int spaces, final 
StringBuilder sb) {
+        for (int i=0; i < spaces; i++) {
+            sb.append(" ");
+        }
+        
+        if ( tree.getText().trim().isEmpty() ) {
+            sb.append(tree.toString()).append("\n");
+        } else {
+            sb.append(tree.getText()).append("\n");
+        }
+        
+        for (int i=0; i < tree.getChildCount(); i++) {
+            printTree(tree.getChild(i), spaces + 2, sb);
+        }
+    }
+    
+    private List<Accumulator<?>> buildAccumulators(final Tree selectTree, 
final boolean distinct) {
+       final List<Accumulator<?>> accumulators = new ArrayList<>();
+       
+       if ( selectTree.getType() != ProvenanceQueryParser.SELECT ) {
+               throw new IllegalArgumentException("Cannot build accumulators 
for a non-SELECT tree");
+       }
+       
+       for (int i=0; i < selectTree.getChildCount(); i++) {
+               final Tree childTree = selectTree.getChild(i);
+               accumulators.add(buildAccumulator(childTree, distinct));
+       }
+       
+       return accumulators;
+    }
+    
+    private Accumulator<?> buildAccumulator(final Tree tree, final boolean 
distinct) {
+       switch (tree.getType()) {
+               case SUM:
+                       return new SumAccumulator(accumulatorIdGenerator++, 
toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), tree), "SUM(" + 
getLabel(tree.getChild(0)) + ")");
+               case AVG:
+                       return new AverageAccumulator(accumulatorIdGenerator++, 
toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), tree), "AVG(" + 
getLabel(tree.getChild(0)) + ")");
+               case EVENT:
+                       return new EventAccumulator(accumulatorIdGenerator++, 
getLabel(tree), distinct);
+               case IDENTIFIER:
+                       return new EventAccumulator(accumulatorIdGenerator++, 
getLabel(tree.getChild(0)), distinct);
+               case EVENT_PROPERTY:
+               case ATTRIBUTE:
+                       return new EventAccumulator(accumulatorIdGenerator++, 
getLabel(tree.getChild(0)), buildOperandEvaluator(tree), distinct);
+               case YEAR:
+               case DAY:
+               case HOUR:
+               case MINUTE:
+               case SECOND:
+                       return new EventAccumulator(accumulatorIdGenerator++, 
getLabel(tree), buildOperandEvaluator(tree), distinct);
+               case COUNT:
+                       if ( 
"Event".equalsIgnoreCase(tree.getChild(0).getText() ) ) {
+                               return new 
CountAccumulator(accumulatorIdGenerator++, null, "COUNT(" + 
getLabel(tree.getChild(0)) + ")");
+                       }
+                       return new CountAccumulator(accumulatorIdGenerator++, 
buildOperandEvaluator(tree.getChild(0)), "COUNT(" + getLabel(tree.getChild(0)) 
+ ")");
+               default:
+                               throw new 
UnsupportedOperationException("Haven't implemented accumulators yet for " + 
tree);
+       }
+    }
+    
+    private String getLabel(final Tree tree) {
+       final int type = tree.getType();
+       
+       switch (type) {
+               case EVENT_PROPERTY:
+               case ATTRIBUTE:
+                       return tree.getChild(0).getText();
+               case YEAR:
+               case DAY:
+               case HOUR:
+               case MINUTE:
+               case SECOND:
+                       return tree.getText() + "(" + 
getLabel(tree.getChild(0)) + ")";
+       }
+       
+       return tree.getText();
+    }
+    
+    private OperandEvaluator<?> buildOperandEvaluator(final Tree tree) {
+       switch (tree.getType()) {
+               case EVENT_PROPERTY:
+                       switch (tree.getChild(0).getType()) {
+                               case FILESIZE:
+                                       return new SizeEvaluator();
+                               case TRANSIT_URI:
+                                       return new TransitUriEvaluator();
+                               case TIMESTAMP:
+                                       return new TimestampEvaluator();
+                               case TYPE:
+                                       return new TypeEvaluator();
+                               case COMPONENT_ID:
+                                       return new ComponentIdEvaluator();
+                               case RELATIONSHIP:
+                                       return new RelationshipEvaluator();
+                               case UUID:
+                                   return new UuidEvaluator();
+                               default:
+                                       // TODO: IMPLEMENT
+                                       throw new 
UnsupportedOperationException("Haven't implemented extraction of property " + 
tree.getChild(0).getText());
+                       }
+               case ATTRIBUTE:
+                       return new 
AttributeEvaluator(toStringEvaluator(buildOperandEvaluator(tree.getChild(0)), 
tree));
+               case STRING_LITERAL:
+                       return new StringLiteralEvaluator(tree.getText());
+               case NUMBER:
+                       return new 
LongLiteralEvaluator(Long.valueOf(tree.getText()));
+               case HOUR:
+                       return new 
TimeFieldEvaluator(toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), 
tree), Calendar.HOUR_OF_DAY, HOUR);
+               case DAY:
+                       return new 
TimeFieldEvaluator(toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), 
tree), Calendar.DAY_OF_YEAR, DAY);
+               case YEAR:
+                       return new 
TimeFieldEvaluator(toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), 
tree), Calendar.YEAR, YEAR);
+               case MINUTE:
+                       return new 
TimeFieldEvaluator(toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), 
tree), Calendar.MINUTE, MINUTE);
+               case SECOND:
+                       return new 
TimeFieldEvaluator(toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), 
tree), Calendar.SECOND, SECOND);
+               default:
+                       throw new 
ProvenanceQueryLanguageParsingException("Unable to extract value '" + 
tree.toString() + "' from event because it is not a valid ");
+       }
+    }
+    
+    
+    private RecordEvaluator<Boolean> buildSourceEvaluator(final Tree fromTree) 
{
+       if ( fromTree == null ) {
+               throw new NullPointerException();
+       }
+       if ( fromTree.getType() != FROM ) {
+               throw new IllegalArgumentException("Cannot build Soruce 
Evaluator from a Tree that is not a FROM-tree");
+       }
+       
+       final Set<ProvenanceEventType> types = new HashSet<>();
+       for ( int i=0; i < fromTree.getChildCount(); i++ ) {
+               final Tree typeTree = fromTree.getChild(i);
+               if ( "*".equals(typeTree.getText()) ) {
+                       return null;
+               } else {
+                       
types.add(ProvenanceEventType.valueOf(typeTree.getText().toUpperCase()));
+               }
+       }
+       
+       return new RecordTypeEvaluator(types);
+    }
+    
+    
+    private BooleanEvaluator buildConditionEvaluator(final Tree tree) {
+       switch (tree.getType()) {
+               case AND:
+                       return new 
AndEvaluator(buildConditionEvaluator(tree.getChild(0)), 
buildConditionEvaluator(tree.getChild(1)));
+               case OR:
+                       return new 
OrEvaluator(buildConditionEvaluator(tree.getChild(0)), 
buildConditionEvaluator(tree.getChild(1)));
+               case EQUALS:
+                       return new 
EqualsEvaluator(buildOperandEvaluator(tree.getChild(0)), 
buildOperandEvaluator(tree.getChild(1)));
+               case NOT_EQUALS:
+                       return new 
EqualsEvaluator(buildOperandEvaluator(tree.getChild(0)), 
buildOperandEvaluator(tree.getChild(1)), true);
+               case GT:
+                       return new 
GreaterThanEvaluator(buildOperandEvaluator(tree.getChild(0)), 
buildOperandEvaluator(tree.getChild(1)));
+               case LT:
+                       return new 
LessThanEvaluator(buildOperandEvaluator(tree.getChild(0)), 
buildOperandEvaluator(tree.getChild(1)));
+               case NOT:
+                       return 
buildConditionEvaluator(tree.getChild(0)).negate();
+               case MATCHES: {
+                       final OperandEvaluator<?> rhs = 
buildOperandEvaluator(tree.getChild(1));
+                       if ( !String.class.equals( rhs.getType() ) ) {
+                               throw new 
ProvenanceQueryLanguageParsingException("Right-hand side of MATCHES operator 
must be a Regular Expression but found " + rhs);
+                       }
+                       return new 
MatchesEvaluator(buildOperandEvaluator(tree.getChild(0)), rhs);
+               }
+               case STARTS_WITH: {
+                       final OperandEvaluator<?> rhs = 
buildOperandEvaluator(tree.getChild(1));
+                       if ( !String.class.equals( rhs.getType() ) ) {
+                               throw new 
ProvenanceQueryLanguageParsingException("Right-hand side of STARTS WITH 
operator must be a String but found " + rhs);
+                       }
+                       return new 
StartsWithEvaluator(buildOperandEvaluator(tree.getChild(0)), rhs);
+               }
+               default:
+                       // TODO: Implement
+                       throw new UnsupportedOperationException("Have not yet 
implemented condition evaluator for " + tree);
+       }
+    }
+    
+    
+    private <T> OperandEvaluator<T> castEvaluator(final OperandEvaluator<?> 
eval, final Tree tree, final Class<T> expectedType) {
+       if ( eval.getType() != expectedType ) {
+               throw new ProvenanceQueryLanguageParsingException("Expected 
type " + expectedType.getSimpleName() + " but found type " + eval.getType() + " 
for term: " + tree);
+       }
+
+       @SuppressWarnings("unchecked")
+               final OperandEvaluator<T> retEvaluator = ((OperandEvaluator<T>) 
eval);
+       return retEvaluator;
+
+    }
+    
+    private OperandEvaluator<String> toStringEvaluator(final 
OperandEvaluator<?> eval, final Tree tree) {
+       return castEvaluator(eval, tree, String.class);
+    }
+    
+    private OperandEvaluator<Long> toLongEvaluator(final OperandEvaluator<?> 
eval, final Tree tree) {
+       return castEvaluator(eval, tree, Long.class);
+    }
+    
+    
+    private List<RecordEvaluator<?>> buildGroupEvaluators(final Tree 
groupByTree) {
+       if ( groupByTree == null ) {
+               return null;
+       }
+
+       if ( groupByTree.getType() != GROUP_BY ) {
+               throw new IllegalArgumentException("Expected GroupBy tree but 
got " + groupByTree);
+       }
+       
+       final List<RecordEvaluator<?>> evaluators = new 
ArrayList<>(groupByTree.getChildCount());
+       for (int i=0; i < groupByTree.getChildCount(); i++) {
+               final Tree tree = groupByTree.getChild(i);
+               final RecordEvaluator<?> evaluator;
+               
+               switch (tree.getType()) {
+                       case EVENT_PROPERTY:
+                       case STRING_LITERAL:
+                       case ATTRIBUTE:
+                       case YEAR:
+                       case DAY:
+                       case HOUR:
+                       case MINUTE:
+                       case SECOND:
+                               evaluator = buildOperandEvaluator(tree);
+                               break;
+                       default:
+                               evaluator = buildConditionEvaluator(tree);
+                               break;
+               }
+               
+               evaluators.add(evaluator);
+       }
+       
+       return evaluators;
+    }
+    
+    private RowSorter buildSorter(final Tree orderByTree, final boolean 
grouped) {
+       if ( orderByTree.getType() != ORDER_BY ) {
+               throw new IllegalArgumentException();
+       }
+       
+       if ( grouped ) {
+               final Map<Accumulator<?>, SortDirection> accumulators = new 
LinkedHashMap<>(orderByTree.getChildCount());
+               for (int i=0; i < orderByTree.getChildCount(); i++) {
+                       final Tree orderTree = orderByTree.getChild(i);
+                       final Accumulator<?> accumulator = 
buildAccumulator(orderTree.getChild(0), true);
+
+                       final SortDirection sortDir;
+                       if ( orderTree.getChildCount() > 1 ) {
+                               final int sortDirType = 
orderTree.getChild(1).getType();
+                               sortDir = (sortDirType == ASC) ? 
SortDirection.ASC : SortDirection.DESC;
+                       } else {
+                               sortDir = SortDirection.ASC;
+                       }
+
+                       accumulators.put(accumulator, sortDir);
+               }
+               
+               return new GroupedSorter(accumulators);
+       } else {
+               // TODO: Allow ORDER BY of aggregate values
+               final Map<OperandEvaluator<?>, SortDirection> evaluators = new 
LinkedHashMap<>(orderByTree.getChildCount());
+               for (int i=0; i < orderByTree.getChildCount(); i++) {
+                       final Tree orderTree = orderByTree.getChild(i);
+                       final OperandEvaluator<?> evaluator = 
buildOperandEvaluator(orderTree.getChild(0));
+                       
+                       final SortDirection sortDir;
+                       if ( orderTree.getChildCount() > 1 ) {
+                               final int sortDirType = 
orderTree.getChild(1).getType();
+                               sortDir = (sortDirType == ASC) ? 
SortDirection.ASC : SortDirection.DESC;
+                       } else {
+                               sortDir = SortDirection.ASC;
+                       }
+                       
+                       evaluators.put(evaluator, sortDir);
+               }
+       
+               return new FieldSorter(evaluators);
+       }
+    }
+    
+    
+    public static ProvenanceResultSet execute(final String query, final 
ProvenanceEventRepository repo) throws IOException {
+       return ProvenanceQuery.compile(query).execute(repo);
+    }
+    
+    
+    public ProvenanceResultSet evaluate(final Iterator<? extends 
StoredProvenanceEvent> matchedEvents) {
+        final List<String> labels = new ArrayList<>();
+        final List<Class<?>> returnTypes = new 
ArrayList<>(selectAccumulators.size());
+        
+        for ( final Accumulator<?> accumulator : selectAccumulators ) {
+            labels.add(accumulator.getLabel());
+            returnTypes.add(accumulator.getReturnType());
+        }
+        
+        ProvenanceResultSet rs;
+        if ( isAggregateRequired() ) {
+            rs = new GroupingResultSet(matchedEvents, 
+                selectAccumulators, sourceEvaluator, conditionEvaluator,
+                labels, returnTypes, groupEvaluators, sorter, limit);
+        } else if (sorter == null) {
+            rs = new StandardUnorderedResultSet(matchedEvents, 
selectAccumulators, sourceEvaluator, conditionEvaluator, labels, returnTypes, 
limit);
+        } else {
+            rs = new StandardOrderedResultSet(matchedEvents, 
selectAccumulators, sourceEvaluator, conditionEvaluator, labels, returnTypes, 
sorter, limit);
+        }
+        
+        return rs;
+    }
+    
+    public ProvenanceResultSet execute(final ProvenanceEventRepository repo) 
throws IOException {
+       final RepositoryEvaluator repoEvaluator = new SelectAllRecords();
+       
+       final Iterator<StoredProvenanceEvent> potentialMatches = 
repoEvaluator.evaluate(repo);
+       final List<String> labels = new ArrayList<>();
+       final List<Class<?>> returnTypes = new 
ArrayList<>(selectAccumulators.size());
+       
+               for ( final Accumulator<?> accumulator : selectAccumulators ) {
+                       labels.add(accumulator.getLabel());
+                       returnTypes.add(accumulator.getReturnType());
+               }
+               
+               ProvenanceResultSet rs;
+               if ( isAggregateRequired() ) {
+                       rs = new GroupingResultSet(potentialMatches, 
+                               selectAccumulators, sourceEvaluator, 
conditionEvaluator,
+                               labels, returnTypes, groupEvaluators, sorter, 
limit);
+               } else if (sorter == null) {
+                       rs = new StandardUnorderedResultSet(potentialMatches, 
selectAccumulators, sourceEvaluator, conditionEvaluator, labels, returnTypes, 
limit);
+               } else {
+                       rs = new StandardOrderedResultSet(potentialMatches, 
selectAccumulators, sourceEvaluator, conditionEvaluator, labels, returnTypes, 
sorter, limit);
+               }
+               
+               return rs;
+    }
+    
+    private boolean isAggregateRequired() {
+               if ( groupEvaluators != null && !groupEvaluators.isEmpty() ) {
+                       return true;
+               }
+               
+               for ( final Accumulator<?> accumulator : selectAccumulators ) {
+                       if ( accumulator.isAggregateFunction() ) {
+                               return true;
+                       }
+               }
+               
+               
+               return false;
+    }
+    
+    
+    public RecordEvaluator<Boolean> getWhereClause() {
+       return conditionEvaluator;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/Accumulator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/Accumulator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/Accumulator.java
new file mode 100644
index 0000000..db49091
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/Accumulator.java
@@ -0,0 +1,28 @@
+package org.apache.nifi.pql.evaluation;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.pql.groups.Group;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public interface Accumulator<T> extends Cloneable {
+
+       T accumulate(ProvenanceEventRecord record, Group group);
+       
+       String getLabel();
+       
+       boolean isAggregateFunction();
+       
+       Class<? extends T> getReturnType();
+       
+       Accumulator<T> clone();
+       
+       long getId();
+       
+       void reset();
+       
+       Map<Group, List<T>> getValues();
+       
+       List<T> getValues(Group group);
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/BooleanEvaluator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/BooleanEvaluator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/BooleanEvaluator.java
new file mode 100644
index 0000000..2c1549a
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/BooleanEvaluator.java
@@ -0,0 +1,5 @@
+package org.apache.nifi.pql.evaluation;
+
+public interface BooleanEvaluator extends RecordEvaluator<Boolean> {
+       BooleanEvaluator negate();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/OperandEvaluator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/OperandEvaluator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/OperandEvaluator.java
new file mode 100644
index 0000000..0daace6
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/OperandEvaluator.java
@@ -0,0 +1,7 @@
+package org.apache.nifi.pql.evaluation;
+
+public interface OperandEvaluator<T> extends RecordEvaluator<T> {
+
+       Class<T> getType();
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/RecordEvaluator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/RecordEvaluator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/RecordEvaluator.java
new file mode 100644
index 0000000..98f3b04
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/RecordEvaluator.java
@@ -0,0 +1,11 @@
+package org.apache.nifi.pql.evaluation;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public interface RecordEvaluator<T> {
+
+       T evaluate(ProvenanceEventRecord record);
+       
+       int getEvaluatorType();
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/RepositoryEvaluator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/RepositoryEvaluator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/RepositoryEvaluator.java
new file mode 100644
index 0000000..eb55f75
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/RepositoryEvaluator.java
@@ -0,0 +1,13 @@
+package org.apache.nifi.pql.evaluation;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.StoredProvenanceEvent;
+
+public interface RepositoryEvaluator {
+
+       Iterator<StoredProvenanceEvent> evaluate(ProvenanceEventRepository 
repository) throws IOException;
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/AverageAccumulator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/AverageAccumulator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/AverageAccumulator.java
new file mode 100644
index 0000000..9b04308
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/AverageAccumulator.java
@@ -0,0 +1,112 @@
+package org.apache.nifi.pql.evaluation.accumulation;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.pql.evaluation.Accumulator;
+import org.apache.nifi.pql.evaluation.RecordEvaluator;
+import org.apache.nifi.pql.groups.Group;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public class AverageAccumulator implements Accumulator<Double> {
+       
+       private final long id;
+       private final RecordEvaluator<Long> evaluator;
+       private final String label;
+       private final Map<Group, Values> values = new LinkedHashMap<>();
+       
+       public AverageAccumulator(final long id, final RecordEvaluator<Long> 
extractor, final String label) {
+               this.id = id;
+               this.evaluator = extractor;
+               this.label = label;
+       }
+       
+       @Override
+       public Map<Group, List<Double>> getValues() {
+               final Map<Group, List<Double>> avgs = new 
HashMap<>(values.size());
+               for ( final Map.Entry<Group, Values> entry : values.entrySet() 
) {
+                       final Values values = entry.getValue();
+                       final double avg = (double) values.getSum() / (double) 
values.getCount();
+                       avgs.put(entry.getKey(), 
Collections.singletonList(avg));
+               }
+               return avgs;
+       }
+
+       @Override
+       public List<Double> getValues(final Group group) {
+               final Values v = values.get(group);
+               if ( v == null ) {
+                       return Collections.emptyList();
+               }
+               
+               final double d = v.getSum() / v.getCount();
+               return Collections.singletonList(d);
+       }
+       
+       public Double accumulate(final ProvenanceEventRecord record, final 
Group group) {
+               final Long val = evaluator.evaluate(record);
+               if ( val != null ) {
+                       Values v = values.get(group);
+                       if ( v == null ) {
+                               v = new Values();
+                               values.put(group, v);
+                       }
+                       
+                       v.increment(val.longValue());
+                       return (double) v.getSum() / (double) v.getCount();
+               }
+               
+               return null;
+       }
+       
+       @Override
+       public String getLabel() {
+               return label;
+       }
+       
+       @Override
+       public boolean isAggregateFunction() {
+               return true;
+       }
+       
+       @Override
+       public Class<Double> getReturnType() {
+               return Double.class;
+       }
+       
+       @Override
+       public AverageAccumulator clone() {
+               return new AverageAccumulator(id, evaluator, label);
+       }
+       
+       @Override
+       public long getId() {
+               return id;
+       }
+
+       @Override
+       public void reset() {
+               values.clear();
+       }
+
+       private static class Values {
+               private long count;
+               private long sum;
+               
+               public void increment(final long sum) {
+                       count++;
+                       this.sum += sum;
+               }
+               
+               public long getCount() {
+                       return count;
+               }
+               
+               public long getSum() {
+                       return sum;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/CountAccumulator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/CountAccumulator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/CountAccumulator.java
new file mode 100644
index 0000000..d4af4bf
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/CountAccumulator.java
@@ -0,0 +1,91 @@
+package org.apache.nifi.pql.evaluation.accumulation;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.pql.evaluation.Accumulator;
+import org.apache.nifi.pql.evaluation.RecordEvaluator;
+import org.apache.nifi.pql.groups.Group;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public class CountAccumulator implements Accumulator<Long> {
+
+       private final long id;
+       private final RecordEvaluator<?> evaluator;
+       private final String label;
+       private final Map<Group, Long> counts = new LinkedHashMap<>();
+       
+       
+       public CountAccumulator(final long id, final RecordEvaluator<?> 
extractor, final String label) {
+               this.id = id;
+               this.evaluator = extractor;
+               this.label = label;
+       }
+       
+       @Override
+       public Map<Group, List<Long>> getValues() {
+               final Map<Group, List<Long>> map = new HashMap<>();
+               for ( final Map.Entry<Group, Long> entry : counts.entrySet() ) {
+                       map.put(entry.getKey(), 
Collections.singletonList(entry.getValue()));
+               }
+               return map;
+       }
+
+       @Override
+       public List<Long> getValues(final Group group) {
+               final Long count = counts.get(group);
+               if ( count == null ) {
+                       return Collections.emptyList();
+               }
+               
+               return Collections.singletonList(count);
+       }
+       
+       @Override
+       public Long accumulate(final ProvenanceEventRecord record, final Group 
group) {
+               final Object value = (evaluator == null) ? record : 
evaluator.evaluate(record);
+               if ( value != null ) {
+                       Long val = counts.get(group);
+                       if ( val == null ) {
+                               val = 0L;
+                       }
+                       counts.put(group, val + 1);
+                       return val + 1;
+               }
+               
+               return counts.get(group);
+       }
+
+       @Override
+       public String getLabel() {
+               return label;
+       }
+
+       @Override
+       public boolean isAggregateFunction() {
+               return true;
+       }
+
+       @Override
+       public Class<? extends Long> getReturnType() {
+               return Long.class;
+       }
+
+       @Override
+       public long getId() {
+               return id;
+       }
+
+       @Override
+       public void reset() {
+               counts.clear();
+       }
+
+       public CountAccumulator clone() {
+               return new CountAccumulator(id, evaluator, label);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/EventAccumulator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/EventAccumulator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/EventAccumulator.java
new file mode 100644
index 0000000..9d4487c
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/EventAccumulator.java
@@ -0,0 +1,98 @@
+package org.apache.nifi.pql.evaluation.accumulation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.pql.evaluation.Accumulator;
+import org.apache.nifi.pql.evaluation.OperandEvaluator;
+import org.apache.nifi.pql.groups.Group;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public class EventAccumulator implements Accumulator<Object>, Cloneable {
+
+       private final long id;
+       private final Map<Group, List<Object>> records = new LinkedHashMap<>();
+       private final String label;
+       private final OperandEvaluator<?> valueExtractor;
+       private final boolean distinct;
+       
+       public EventAccumulator(final long id, final String label, final 
boolean distinct) {
+               this(id, label, null, distinct);
+       }
+       
+       public EventAccumulator(final long id, final String label, final 
OperandEvaluator<?> valueExtractor, final boolean distinct) {
+               this.id = id;
+               this.label = label;
+               this.valueExtractor = valueExtractor;
+               this.distinct = distinct;
+       }
+       
+       @Override
+       public Map<Group, List<Object>> getValues() {
+               return Collections.unmodifiableMap(records);
+       }
+
+       @Override
+       public List<Object> getValues(final Group group) {
+               final List<Object> objects = records.get(group);
+               if ( objects == null ) {
+                       return Collections.emptyList();
+               }
+               
+               return Collections.unmodifiableList(objects);
+       }
+       
+       @Override
+       public Object accumulate(final ProvenanceEventRecord record, final 
Group group) {
+               final Object value = valueExtractor == null ? record : 
valueExtractor.evaluate(record);
+               
+               if ( group == null ) {
+                   return value;
+               }
+               
+               List<Object> groupRecords = records.get(group);
+               if ( groupRecords == null ) {
+                       groupRecords = new ArrayList<>();
+                       records.put(group, groupRecords);
+               }
+               
+               if ( !distinct || !groupRecords.contains(value) ) {
+                       groupRecords.add( value );
+               }
+               
+               return Collections.unmodifiableList(groupRecords);
+       }
+
+       @Override
+       public String getLabel() {
+               return label;
+       }
+
+       @Override
+       public boolean isAggregateFunction() {
+               return false;
+       }
+
+       @Override
+       public Class<? extends Object> getReturnType() {
+               return valueExtractor == null ? ProvenanceEventRecord.class : 
valueExtractor.getType();
+       }
+
+       @Override
+       public EventAccumulator clone() {
+               return new EventAccumulator(id, label, valueExtractor, 
distinct);
+       }
+       
+       @Override
+       public long getId() {
+               return id;
+       }
+
+       @Override
+       public void reset() {
+               records.clear();
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/SumAccumulator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/SumAccumulator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/SumAccumulator.java
new file mode 100644
index 0000000..5eacc6e
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/SumAccumulator.java
@@ -0,0 +1,91 @@
+package org.apache.nifi.pql.evaluation.accumulation;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.pql.evaluation.Accumulator;
+import org.apache.nifi.pql.evaluation.RecordEvaluator;
+import org.apache.nifi.pql.groups.Group;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public class SumAccumulator implements Accumulator<Long>, Cloneable {
+       
+       private final long id;
+       private final RecordEvaluator<Long> evaluator;
+       private final String label;
+       private final Map<Group, Long> sums = new LinkedHashMap<>();
+       
+       public SumAccumulator(final long id, final RecordEvaluator<Long> 
extractor, final String label) {
+               this.id = id;
+               this.evaluator = extractor;
+               this.label = label;
+       }
+       
+       @Override
+       public Map<Group, List<Long>> getValues() {
+               final Map<Group, List<Long>> map = new HashMap<>();
+               for ( final Map.Entry<Group, Long> entry : sums.entrySet() ) {
+                       map.put(entry.getKey(), 
Collections.singletonList(entry.getValue()));
+               }
+               return map;
+       }
+       
+       @Override
+       public List<Long> getValues(final Group group) {
+               final Long sum = sums.get(group);
+               if ( sum == null ) {
+                       return Collections.emptyList();
+               }
+               
+               return Collections.singletonList(sum);
+       }
+       
+       @Override
+       public Long accumulate(final ProvenanceEventRecord record, final Group 
group) {
+               final Long val = evaluator.evaluate(record);
+               if ( val != null ) {
+                       Long curVal = sums.get(group);
+                       if ( curVal == null ) {
+                               curVal = 0L;
+                       }
+                       
+                       final long newVal = curVal + val;
+                       sums.put(group, newVal);
+                       return newVal;
+               }
+               
+               return null;
+       }
+
+       @Override
+       public String getLabel() {
+               return label;
+       }
+       
+       @Override
+       public boolean isAggregateFunction() {
+               return true;
+       }
+       
+       @Override
+       public Class<Long> getReturnType() {
+               return Long.class;
+       }
+       
+       public SumAccumulator clone() {
+               return new SumAccumulator(id, evaluator, label);
+       }
+       
+       @Override
+       public long getId() {
+               return id;
+       }
+
+       @Override
+       public void reset() {
+               sums.clear();
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/ConversionUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/ConversionUtils.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/ConversionUtils.java
new file mode 100644
index 0000000..75b7a3b
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/ConversionUtils.java
@@ -0,0 +1,97 @@
+package org.apache.nifi.pql.evaluation.comparison;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class ConversionUtils {
+
+    public static final String DATE_TO_STRING_FORMAT = "EEE MMM dd HH:mm:ss 
zzz yyyy";
+    public static final Pattern DATE_TO_STRING_PATTERN = 
Pattern.compile("(?:[a-zA-Z]{3} ){2}\\d{2} \\d{2}\\:\\d{2}\\:\\d{2} (?:.*?) 
\\d{4}");
+    
+    public static final String ALTERNATE_FORMAT_WITHOUT_MILLIS = "yyyy/MM/dd 
HH:mm:ss";
+    public static final String ALTERNATE_FORMAT_WITH_MILLIS = "yyyy/MM/dd 
HH:mm:ss.SSS";
+    public static final Pattern ALTERNATE_PATTERN = 
Pattern.compile("\\d{4}/\\d{2}/\\d{2} \\d{2}\\:\\d{2}\\:\\d{2}(\\.\\d{3})?");
+
+    public static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
+
+    
+    public static Long convertToLong(final Object o) {
+       if ( o == null ) {
+               return null;
+       }
+       
+       if (o instanceof Long) {
+               return (Long) o;
+       }
+       
+       if (o instanceof Number) {
+               return ((Number) o).longValue();
+       }
+       
+       if ( o instanceof Date ) {
+               return ((Date) o).getTime();
+       }
+       
+       if ( o instanceof Calendar ) {
+               return ((Calendar) o).getTimeInMillis();
+       }
+       
+       if ( o instanceof String ) {
+               return convertToLong((String) o);
+       }
+       
+       return null;
+    }
+    
+       public static Long convertToLong(final String value) {
+               if ( value == null ) {
+                       return null;
+               }
+               
+               final String trimmed = value.trim();
+               if ( trimmed.isEmpty() ) {
+                       return null;
+               }
+               
+               
+               if ( DATE_TO_STRING_PATTERN.matcher(trimmed).matches() ) {
+            final SimpleDateFormat sdf = new 
SimpleDateFormat(DATE_TO_STRING_FORMAT);
+            
+            try {
+                final Date date = sdf.parse(trimmed);
+                return date.getTime();
+            } catch (final ParseException pe) {
+                return null;
+            }
+        } else if ( NUMBER_PATTERN.matcher(trimmed).matches() ) {
+            return Long.valueOf(trimmed);
+        } else {
+            final Matcher altMatcher = ALTERNATE_PATTERN.matcher(trimmed);
+            if ( altMatcher.matches() ) {
+                final String millisValue = altMatcher.group(1);
+                
+                final String format;
+                if ( millisValue == null ) {
+                    format = ALTERNATE_FORMAT_WITHOUT_MILLIS;
+                } else {
+                    format = ALTERNATE_FORMAT_WITH_MILLIS;
+                }
+                
+                final SimpleDateFormat sdf = new SimpleDateFormat(format);
+                
+                try {
+                    return sdf.parse(trimmed).getTime();
+                } catch (final ParseException pe) {
+                    return null;
+                }
+            } else {
+                return null;
+            }
+        }
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/EqualsEvaluator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/EqualsEvaluator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/EqualsEvaluator.java
new file mode 100644
index 0000000..70158fb
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/EqualsEvaluator.java
@@ -0,0 +1,74 @@
+package org.apache.nifi.pql.evaluation.comparison;
+
+import org.apache.nifi.pql.evaluation.BooleanEvaluator;
+import org.apache.nifi.pql.evaluation.OperandEvaluator;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+
+public class EqualsEvaluator implements BooleanEvaluator {
+
+       private final OperandEvaluator<?> lhs;
+       private final OperandEvaluator<?> rhs;
+       private final boolean negated;
+       private final String alias;
+       
+       public EqualsEvaluator(final OperandEvaluator<?> lhs, final 
OperandEvaluator<?> rhs) {
+               this(lhs, rhs, false, null);
+       }
+
+       public EqualsEvaluator(final OperandEvaluator<?> lhs, final 
OperandEvaluator<?> rhs, final String alias) {
+               this(lhs, rhs, false, alias);
+       }
+
+       public EqualsEvaluator(final OperandEvaluator<?> lhs, final 
OperandEvaluator<?> rhs, final boolean negated) {
+               this(lhs, rhs, negated, null);
+       }
+       
+       public EqualsEvaluator(final OperandEvaluator<?> lhs, final 
OperandEvaluator<?> rhs, final boolean negated, final String alias) {
+               this.lhs = lhs;
+               this.rhs = rhs;
+               this.negated = negated;
+               this.alias = alias;
+       }
+       
+       public OperandEvaluator<?> getLHS() {
+               return lhs;
+       }
+       
+       public OperandEvaluator<?> getRHS() {
+               return rhs;
+       }
+       
+       public Boolean evaluate(final ProvenanceEventRecord record) {
+               Object lhsValue = lhs.evaluate(record);
+               Object rhsValue = rhs.evaluate(record);
+               
+               if ( lhsValue == null || rhsValue == null ) {
+                       return false;
+               }
+               
+               if ( lhsValue instanceof ProvenanceEventType ) {
+                       lhsValue = ((ProvenanceEventType) lhsValue).name();
+               }
+               if ( rhsValue instanceof ProvenanceEventType ) {
+                       rhsValue = ((ProvenanceEventType) rhsValue).name();
+               }
+               
+               final boolean equal = lhsValue.equals(rhsValue);
+               return negated ? !equal : equal;
+       }
+
+       public BooleanEvaluator negate() {
+               return new EqualsEvaluator(lhs, rhs, !negated, alias);
+       }
+
+       @Override
+       public String toString() {
+               return alias == null ? lhs.toString() + "=" + rhs.toString() : 
alias;
+       }
+       
+       @Override
+       public int getEvaluatorType() {
+               return org.apache.nifi.pql.ProvenanceQueryParser.EQUALS;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/GreaterThanEvaluator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/GreaterThanEvaluator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/GreaterThanEvaluator.java
new file mode 100644
index 0000000..327fc9f
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/GreaterThanEvaluator.java
@@ -0,0 +1,51 @@
+package org.apache.nifi.pql.evaluation.comparison;
+
+import org.apache.nifi.pql.evaluation.BooleanEvaluator;
+import org.apache.nifi.pql.evaluation.OperandEvaluator;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public class GreaterThanEvaluator implements BooleanEvaluator {
+       private final OperandEvaluator<?> lhs;
+       private final OperandEvaluator<?> rhs;
+       private final boolean negated;
+       
+       public GreaterThanEvaluator(final OperandEvaluator<?> lhs, final 
OperandEvaluator<?> rhs) {
+               this(lhs, rhs, false);
+       }
+       
+       public GreaterThanEvaluator(final OperandEvaluator<?> lhs, final 
OperandEvaluator<?> rhs, final boolean negate) {
+               this.lhs = lhs;
+               this.rhs = rhs;
+               this.negated = negate;
+       }
+       
+       public OperandEvaluator<?> getLHS() {
+               return lhs;
+       }
+       
+       public OperandEvaluator<?> getRHS() {
+               return rhs;
+       }
+       
+       public Boolean evaluate(final ProvenanceEventRecord record) {
+               final Long lhsValue = 
ConversionUtils.convertToLong(lhs.evaluate(record));
+               final Long rhsValue = 
ConversionUtils.convertToLong(rhs.evaluate(record));
+               
+               if ( lhsValue == null || rhsValue == null ) {
+                       return false;
+               }
+               
+               final boolean greaterThan = lhsValue.longValue() > 
rhsValue.longValue();
+               return negated ? !greaterThan : greaterThan;
+       }
+
+       public BooleanEvaluator negate() {
+               return new GreaterThanEvaluator(lhs, rhs, !negated);
+       }
+
+       @Override
+       public int getEvaluatorType() {
+               return org.apache.nifi.pql.ProvenanceQueryParser.GT;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/LessThanEvaluator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/LessThanEvaluator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/LessThanEvaluator.java
new file mode 100644
index 0000000..5940509
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/LessThanEvaluator.java
@@ -0,0 +1,51 @@
+package org.apache.nifi.pql.evaluation.comparison;
+
+import org.apache.nifi.pql.evaluation.BooleanEvaluator;
+import org.apache.nifi.pql.evaluation.OperandEvaluator;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public class LessThanEvaluator implements BooleanEvaluator {
+       private final OperandEvaluator<?> lhs;
+       private final OperandEvaluator<?> rhs;
+       private final boolean negated;
+       
+       public LessThanEvaluator(final OperandEvaluator<?> lhs, final 
OperandEvaluator<?> rhs) {
+               this(lhs, rhs, false);
+       }
+       
+       public LessThanEvaluator(final OperandEvaluator<?> lhs, final 
OperandEvaluator<?> rhs, final boolean negate) {
+               this.lhs = lhs;
+               this.rhs = rhs;
+               this.negated = negate;
+       }
+       
+       public OperandEvaluator<?> getLHS() {
+               return lhs;
+       }
+       
+       public OperandEvaluator<?> getRHS() {
+               return rhs;
+       }
+       
+       public Boolean evaluate(final ProvenanceEventRecord record) {
+               final Long lhsValue = 
ConversionUtils.convertToLong(lhs.evaluate(record));
+               final Long rhsValue = 
ConversionUtils.convertToLong(rhs.evaluate(record));
+               
+               if ( lhsValue == null || rhsValue == null ) {
+                       return false;
+               }
+               
+               final boolean lessThan = lhsValue.longValue() < 
rhsValue.longValue();
+               return negated ? !lessThan : lessThan;
+       }
+
+       public BooleanEvaluator negate() {
+               return new GreaterThanEvaluator(lhs, rhs, !negated);
+       }
+
+       @Override
+       public int getEvaluatorType() {
+               return org.apache.nifi.pql.ProvenanceQueryParser.LT;
+       }
+
+}

Reply via email to