http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/MatchesEvaluator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/MatchesEvaluator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/MatchesEvaluator.java
new file mode 100644
index 0000000..b646129
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/MatchesEvaluator.java
@@ -0,0 +1,72 @@
+package org.apache.nifi.pql.evaluation.comparison;
+
+import java.util.regex.Pattern;
+
+import org.apache.nifi.pql.evaluation.BooleanEvaluator;
+import org.apache.nifi.pql.evaluation.OperandEvaluator;
+import org.apache.nifi.pql.evaluation.literals.StringLiteralEvaluator;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public class MatchesEvaluator implements BooleanEvaluator {
+
+       private final OperandEvaluator<?> lhs;
+       private final OperandEvaluator<?> rhs;
+       private final boolean negated;
+       private final Pattern pattern;
+       
+       public MatchesEvaluator(final OperandEvaluator<?> lhs, final 
OperandEvaluator<?> rhs) {
+               this(lhs, rhs, false);
+       }
+       
+       
+       public MatchesEvaluator(final OperandEvaluator<?> lhs, final 
OperandEvaluator<?> rhs, final boolean negated) {
+               this.lhs = lhs;
+               this.rhs = rhs;
+               this.negated = negated;
+               
+               if ( rhs instanceof StringLiteralEvaluator ) {
+                       pattern = 
Pattern.compile(rhs.evaluate(null).toString());
+               } else {
+                       pattern = null;
+               }
+       }
+       
+       public OperandEvaluator<?> getLHS() {
+               return lhs;
+       }
+       
+       public OperandEvaluator<?> getRHS() {
+               return rhs;
+       }
+       
+       public Boolean evaluate(final ProvenanceEventRecord record) {
+               Object lhsValue = lhs.evaluate(record);
+               Object rhsValue = rhs.evaluate(record);
+               
+               if ( lhsValue == null || rhsValue == null ) {
+                       return false;
+               }
+               
+               final String lhsString = lhsValue.toString();
+               
+               final Pattern compiled;
+               if ( pattern == null ) {
+                       compiled = Pattern.compile(rhsValue.toString());
+               } else {
+                       compiled = pattern;
+               }
+               
+               final boolean matches = compiled.matcher(lhsString).matches();
+               return negated ? !matches : matches;
+       }
+
+       public MatchesEvaluator negate() {
+               return new MatchesEvaluator(lhs, rhs, !negated);
+       }
+       
+       @Override
+       public int getEvaluatorType() {
+               return org.apache.nifi.pql.ProvenanceQueryParser.MATCHES;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/RecordTypeEvaluator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/RecordTypeEvaluator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/RecordTypeEvaluator.java
new file mode 100644
index 0000000..d267371
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/RecordTypeEvaluator.java
@@ -0,0 +1,34 @@
+package org.apache.nifi.pql.evaluation.comparison;
+
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.nifi.pql.evaluation.BooleanEvaluator;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+
+public class RecordTypeEvaluator implements BooleanEvaluator {
+       private final Set<ProvenanceEventType> types;
+       
+       public RecordTypeEvaluator(final Set<ProvenanceEventType> types) {
+               this.types = new HashSet<>(types);
+       }
+       
+       @Override
+       public Boolean evaluate(final ProvenanceEventRecord record) {
+               return types.contains(record.getEventType());
+       }
+
+       @Override
+       public BooleanEvaluator negate() {
+               final Set<ProvenanceEventType> negatedTypes = 
EnumSet.complementOf(EnumSet.copyOf(types));
+               return new RecordTypeEvaluator(negatedTypes);
+       }
+
+       @Override
+       public int getEvaluatorType() {
+               return org.apache.nifi.pql.ProvenanceQueryParser.TYPE;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/StartsWithEvaluator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/StartsWithEvaluator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/StartsWithEvaluator.java
new file mode 100644
index 0000000..7c11416
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/StartsWithEvaluator.java
@@ -0,0 +1,57 @@
+package org.apache.nifi.pql.evaluation.comparison;
+
+import org.apache.nifi.pql.evaluation.BooleanEvaluator;
+import org.apache.nifi.pql.evaluation.OperandEvaluator;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public class StartsWithEvaluator implements BooleanEvaluator {
+
+       private final boolean negated;
+       private final OperandEvaluator<?> lhs;
+       private final OperandEvaluator<?> rhs;
+       
+       public StartsWithEvaluator(final OperandEvaluator<?> lhs, final 
OperandEvaluator<?> rhs) {
+               this(lhs, rhs, false);
+       }
+       
+       public StartsWithEvaluator(final OperandEvaluator<?> lhs, final 
OperandEvaluator<?> rhs, final boolean negated) {
+               this.lhs = lhs;
+               this.rhs = rhs;
+               this.negated = negated;
+       }
+       
+       @Override
+       public Boolean evaluate(final ProvenanceEventRecord record) {
+               final Object lhsValue = lhs.evaluate(record);
+               final Object rhsValue = rhs.evaluate(record);
+               
+               if ( lhsValue == null || rhsValue == null ) {
+                       return false;
+               }
+               
+               final String lhsString = lhsValue.toString();
+               final String rhsString = rhsValue.toString();
+               
+               final boolean startsWith = lhsString.startsWith(rhsString);
+               return negated ? !startsWith : startsWith;
+       }
+
+       public OperandEvaluator<?> getLHS() {
+               return lhs;
+       }
+       
+       public OperandEvaluator<?> getRHS() {
+               return rhs;
+       }
+       
+       @Override
+       public BooleanEvaluator negate() {
+               return new StartsWithEvaluator(lhs, rhs, !negated);
+       }
+
+       @Override
+       public int getEvaluatorType() {
+               return org.apache.nifi.pql.ProvenanceQueryParser.STARTS_WITH;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/conversion/DateToLongEvaluator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/conversion/DateToLongEvaluator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/conversion/DateToLongEvaluator.java
new file mode 100644
index 0000000..6832e14
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/conversion/DateToLongEvaluator.java
@@ -0,0 +1,27 @@
+package org.apache.nifi.pql.evaluation.conversion;
+
+import org.apache.nifi.pql.evaluation.OperandEvaluator;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public class DateToLongEvaluator implements OperandEvaluator<Long> {
+
+       
+       @Override
+       public Long evaluate(ProvenanceEventRecord record) {
+               // TODO Auto-generated method stub
+               return null;
+       }
+
+       @Override
+       public int getEvaluatorType() {
+               // TODO Auto-generated method stub
+               return 0;
+       }
+
+       @Override
+       public Class<Long> getType() {
+               // TODO Auto-generated method stub
+               return null;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/AttributeEvaluator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/AttributeEvaluator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/AttributeEvaluator.java
new file mode 100644
index 0000000..c143c4a
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/AttributeEvaluator.java
@@ -0,0 +1,42 @@
+package org.apache.nifi.pql.evaluation.extraction;
+
+import org.apache.nifi.pql.evaluation.OperandEvaluator;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public class AttributeEvaluator implements OperandEvaluator<String> {
+       private final OperandEvaluator<String> attributeNameEvaluator;
+       
+       public AttributeEvaluator(final OperandEvaluator<String> 
attributeNameEvaluator) {
+               this.attributeNameEvaluator = attributeNameEvaluator;
+       }
+       
+       public OperandEvaluator<String> getAttributeNameEvaluator() {
+               return attributeNameEvaluator;
+       }
+       
+       @Override
+       public String evaluate(final ProvenanceEventRecord record) {
+               final String attributeName = 
attributeNameEvaluator.evaluate(record);
+               if ( attributeName == null ) {
+                       return null;
+               }
+               
+               return record.getAttribute(attributeName);
+       }
+
+       
+       @Override
+       public Class<String> getType() {
+               return String.class;
+       }
+       
+       @Override
+       public String toString() {
+               return attributeNameEvaluator.toString();
+       }
+       
+       @Override
+       public int getEvaluatorType() {
+               return org.apache.nifi.pql.ProvenanceQueryParser.ATTRIBUTE;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/ComponentIdEvaluator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/ComponentIdEvaluator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/ComponentIdEvaluator.java
new file mode 100644
index 0000000..50850c0
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/ComponentIdEvaluator.java
@@ -0,0 +1,23 @@
+package org.apache.nifi.pql.evaluation.extraction;
+
+import org.apache.nifi.pql.evaluation.OperandEvaluator;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public class ComponentIdEvaluator implements OperandEvaluator<String> {
+
+       @Override
+       public String evaluate(final ProvenanceEventRecord record) {
+               return record.getComponentId();
+       }
+
+       @Override
+       public Class<String> getType() {
+               return String.class;
+       }
+
+       @Override
+       public int getEvaluatorType() {
+               return org.apache.nifi.pql.ProvenanceQueryParser.COMPONENT_ID;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/RelationshipEvaluator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/RelationshipEvaluator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/RelationshipEvaluator.java
new file mode 100644
index 0000000..a6ff59d
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/RelationshipEvaluator.java
@@ -0,0 +1,23 @@
+package org.apache.nifi.pql.evaluation.extraction;
+
+import org.apache.nifi.pql.evaluation.OperandEvaluator;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public class RelationshipEvaluator implements OperandEvaluator<String> {
+
+       @Override
+       public String evaluate(final ProvenanceEventRecord record) {
+               return record.getRelationship();
+       }
+
+       @Override
+       public Class<String> getType() {
+               return String.class;
+       }
+
+       @Override
+       public int getEvaluatorType() {
+               return org.apache.nifi.pql.ProvenanceQueryParser.RELATIONSHIP;
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/SizeEvaluator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/SizeEvaluator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/SizeEvaluator.java
new file mode 100644
index 0000000..052b88e
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/SizeEvaluator.java
@@ -0,0 +1,23 @@
+package org.apache.nifi.pql.evaluation.extraction;
+
+import org.apache.nifi.pql.evaluation.OperandEvaluator;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public class SizeEvaluator implements OperandEvaluator<Long> {
+
+       @Override
+       public Long evaluate(final ProvenanceEventRecord record) {
+               return record.getFileSize();
+       }
+
+       @Override
+       public Class<Long> getType() {
+               return Long.class;
+       }
+
+       @Override
+       public int getEvaluatorType() {
+               return org.apache.nifi.pql.ProvenanceQueryParser.FILESIZE;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/TimestampEvaluator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/TimestampEvaluator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/TimestampEvaluator.java
new file mode 100644
index 0000000..fcc1ea1
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/TimestampEvaluator.java
@@ -0,0 +1,26 @@
+package org.apache.nifi.pql.evaluation.extraction;
+
+import org.apache.nifi.pql.evaluation.OperandEvaluator;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public class TimestampEvaluator implements OperandEvaluator<Long> {
+
+       @Override
+       public Long evaluate(final ProvenanceEventRecord record) {
+               if ( record == null ) {
+                       return null;
+               }
+               return record.getEventTime();
+       }
+
+       @Override
+       public Class<Long> getType() {
+               return Long.class;
+       }
+
+       @Override
+       public int getEvaluatorType() {
+               return org.apache.nifi.pql.ProvenanceQueryParser.TIMESTAMP;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/TransitUriEvaluator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/TransitUriEvaluator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/TransitUriEvaluator.java
new file mode 100644
index 0000000..35e4a39
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/TransitUriEvaluator.java
@@ -0,0 +1,23 @@
+package org.apache.nifi.pql.evaluation.extraction;
+
+import org.apache.nifi.pql.evaluation.OperandEvaluator;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public class TransitUriEvaluator implements OperandEvaluator<String> {
+
+       @Override
+       public String evaluate(final ProvenanceEventRecord record) {
+               return record.getTransitUri();
+       }
+
+       @Override
+       public Class<String> getType() {
+               return String.class;
+       }
+
+       @Override
+       public int getEvaluatorType() {
+               return org.apache.nifi.pql.ProvenanceQueryParser.TRANSIT_URI;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/TypeEvaluator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/TypeEvaluator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/TypeEvaluator.java
new file mode 100644
index 0000000..2550bc3
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/TypeEvaluator.java
@@ -0,0 +1,24 @@
+package org.apache.nifi.pql.evaluation.extraction;
+
+import org.apache.nifi.pql.evaluation.OperandEvaluator;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+
+public class TypeEvaluator implements OperandEvaluator<ProvenanceEventType> {
+
+       @Override
+       public ProvenanceEventType evaluate(final ProvenanceEventRecord record) 
{
+               return record.getEventType();
+       }
+
+       @Override
+       public Class<ProvenanceEventType> getType() {
+               return ProvenanceEventType.class;
+       }
+
+       @Override
+       public int getEvaluatorType() {
+               return org.apache.nifi.pql.ProvenanceQueryParser.TYPE;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/UuidEvaluator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/UuidEvaluator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/UuidEvaluator.java
new file mode 100644
index 0000000..39d8243
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/UuidEvaluator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pql.evaluation.extraction;
+
+import org.apache.nifi.pql.evaluation.OperandEvaluator;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public class UuidEvaluator implements OperandEvaluator<String> {
+
+    @Override
+    public String evaluate(final ProvenanceEventRecord record) {
+        return record.getFlowFileUuid();
+    }
+
+    @Override
+    public Class<String> getType() {
+        return String.class;
+    }
+
+    @Override
+    public int getEvaluatorType() {
+        return org.apache.nifi.pql.ProvenanceQueryParser.UUID;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/function/TimeFieldEvaluator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/function/TimeFieldEvaluator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/function/TimeFieldEvaluator.java
new file mode 100644
index 0000000..44450c9
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/function/TimeFieldEvaluator.java
@@ -0,0 +1,60 @@
+package org.apache.nifi.pql.evaluation.function;
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.nifi.pql.evaluation.OperandEvaluator;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public class TimeFieldEvaluator implements OperandEvaluator<Long> {
+       private final OperandEvaluator<Long> timeExtractor;
+       private final int evaluatorType;
+       
+       private final List<Integer> fieldsToClear = new ArrayList<>();
+       
+       public TimeFieldEvaluator(final OperandEvaluator<Long> timeExtractor, 
final int timeField, final int evaluatorType) {
+               this.timeExtractor = timeExtractor;
+               this.evaluatorType = evaluatorType;
+               
+               switch (timeField) {
+                       case Calendar.YEAR:
+                               fieldsToClear.add(Calendar.MONTH);
+                       case Calendar.DAY_OF_MONTH:
+                               fieldsToClear.add(Calendar.HOUR);
+                       case Calendar.HOUR:
+                               fieldsToClear.add(Calendar.MINUTE);
+                       case Calendar.MINUTE:
+                               fieldsToClear.add(Calendar.SECOND);
+                       default:
+                               fieldsToClear.add(Calendar.MILLISECOND);
+               }
+       }
+       
+       @Override
+       public Long evaluate(ProvenanceEventRecord record) {
+               final Long epochMillis = timeExtractor.evaluate(record);
+               if ( epochMillis == null ) {
+                       return null;
+               }
+               
+               final Calendar cal = 
Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+               cal.setTimeInMillis(epochMillis);
+               for ( final Integer field : fieldsToClear ) {
+                       cal.set(field, 0);
+               }
+               return Long.valueOf(cal.getTimeInMillis());
+       }
+
+       @Override
+       public Class<Long> getType() {
+               return Long.class;
+       }
+
+       @Override
+       public int getEvaluatorType() {
+               return evaluatorType;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/literals/LongLiteralEvaluator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/literals/LongLiteralEvaluator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/literals/LongLiteralEvaluator.java
new file mode 100644
index 0000000..ee63c48
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/literals/LongLiteralEvaluator.java
@@ -0,0 +1,27 @@
+package org.apache.nifi.pql.evaluation.literals;
+
+import org.apache.nifi.pql.evaluation.OperandEvaluator;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public class LongLiteralEvaluator implements OperandEvaluator<Long> {
+       private final Long value;
+       
+       public LongLiteralEvaluator(final Long value) {
+               this.value = value;
+       }
+       
+       @Override
+       public Long evaluate(final ProvenanceEventRecord record) {
+               return value;
+       }
+
+       @Override
+       public Class<Long> getType() {
+               return Long.class;
+       }
+
+       @Override
+       public int getEvaluatorType() {
+               return org.apache.nifi.pql.ProvenanceQueryParser.NUMBER;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/literals/StringLiteralEvaluator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/literals/StringLiteralEvaluator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/literals/StringLiteralEvaluator.java
new file mode 100644
index 0000000..d6a09f1
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/literals/StringLiteralEvaluator.java
@@ -0,0 +1,33 @@
+package org.apache.nifi.pql.evaluation.literals;
+
+import org.apache.nifi.pql.evaluation.OperandEvaluator;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public class StringLiteralEvaluator implements OperandEvaluator<String> {
+       private final String value;
+       
+       public StringLiteralEvaluator(final String value) {
+               this.value = value;
+       }
+       
+       @Override
+       public String evaluate(final ProvenanceEventRecord record) {
+               return value;
+       }
+
+       @Override
+       public Class<String> getType() {
+               return String.class;
+       }
+
+       @Override
+       public String toString() {
+               return value;
+       }
+       
+       @Override
+       public int getEvaluatorType() {
+               return org.apache.nifi.pql.ProvenanceQueryParser.STRING_LITERAL;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/logic/AndEvaluator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/logic/AndEvaluator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/logic/AndEvaluator.java
new file mode 100644
index 0000000..fb9b408
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/logic/AndEvaluator.java
@@ -0,0 +1,114 @@
+package org.apache.nifi.pql.evaluation.logic;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.pql.evaluation.BooleanEvaluator;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public class AndEvaluator implements BooleanEvaluator {
+
+       private final BooleanEvaluator lhs;
+       private final BooleanEvaluator rhs;
+       
+       
+       public AndEvaluator(final BooleanEvaluator lhs, final BooleanEvaluator 
rhs) {
+               this.lhs = lhs;
+               this.rhs = rhs;
+       }
+       
+       @Override
+       public Boolean evaluate(final ProvenanceEventRecord record) {
+               return lhs.evaluate(record) && rhs.evaluate(record);
+       }
+
+       @Override
+       public BooleanEvaluator negate() {
+               return new OrEvaluator(lhs.negate(), rhs.negate());
+       }
+
+       public BooleanEvaluator getLHS() {
+               return lhs;
+       }
+       
+       public BooleanEvaluator getRHS() {
+               return rhs;
+       }
+       
+       /**
+        * Converts this AND tree to Disjunctive Normal Form (OR's of AND's)
+        * @return
+        */
+       public BooleanEvaluator toDNF() {
+               final List<BooleanEvaluator> rhsEvaluators = new ArrayList<>();
+               final List<BooleanEvaluator> lhsEvaluators = new ArrayList<>();
+               
+               if ( rhs instanceof OrEvaluator ) {
+                       final OrEvaluator or = (OrEvaluator) rhs;
+                       rhsEvaluators.add(or.getLHS());
+                       rhsEvaluators.add(or.getRHS());
+               } else if ( rhs instanceof AndEvaluator ) {
+                       rhsEvaluators.add( ((AndEvaluator) rhs).toDNF() );
+               } else {
+                       rhsEvaluators.add(rhs);
+               }
+               
+               if ( lhs instanceof OrEvaluator ) {
+                       final OrEvaluator or = (OrEvaluator) lhs;
+                       lhsEvaluators.add(or.getLHS());
+                       lhsEvaluators.add(or.getRHS());
+               } else if ( lhs instanceof AndEvaluator ) {
+                       lhsEvaluators.add( ((AndEvaluator) lhs).toDNF() );
+               } else {
+                       lhsEvaluators.add(lhs);
+               }
+               
+               if ( rhsEvaluators.size() == 1 && lhsEvaluators.size() == 1 ) {
+                       return this;
+               }
+               
+               final List<AndEvaluator> ands = new ArrayList<>();
+               for ( final BooleanEvaluator l : lhsEvaluators ) {
+                       for ( final BooleanEvaluator r : rhsEvaluators ) {
+                               final AndEvaluator and = new AndEvaluator(l, r);
+                               ands.add(and);
+                       }
+               }
+               
+               final AndEvaluator and1 = ands.get(0);
+               final AndEvaluator and2 = ands.get(1);
+               OrEvaluator or = new OrEvaluator(and1, and2);
+               
+               for (int i=2; i < ands.size(); i++) {
+                       final AndEvaluator ae = ands.get(i);
+                       or = new OrEvaluator(or, ae);
+               }
+               
+               return or;
+       }
+       
+       public String toString() {
+               final StringBuilder sb = new StringBuilder();
+               if ( lhs instanceof AndEvaluator || lhs instanceof OrEvaluator 
) {
+                       sb.append("(").append(lhs.toString()).append(")");
+               } else {
+                       sb.append(lhs.toString());
+               }
+               
+               sb.append(" & ");
+               
+               if ( rhs instanceof AndEvaluator || rhs instanceof OrEvaluator 
) {
+                       sb.append("(").append(rhs.toString()).append(")");
+               } else {
+                       sb.append(rhs.toString());
+               }
+
+               return sb.toString();
+       }
+
+       @Override
+       public int getEvaluatorType() {
+               return org.apache.nifi.pql.ProvenanceQueryParser.AND;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/logic/OrEvaluator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/logic/OrEvaluator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/logic/OrEvaluator.java
new file mode 100644
index 0000000..c93462a
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/logic/OrEvaluator.java
@@ -0,0 +1,58 @@
+package org.apache.nifi.pql.evaluation.logic;
+
+import org.apache.nifi.pql.evaluation.BooleanEvaluator;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public class OrEvaluator implements BooleanEvaluator {
+       private final BooleanEvaluator lhs;
+       private final BooleanEvaluator rhs;
+       
+       public OrEvaluator(final BooleanEvaluator lhs, final BooleanEvaluator 
rhs) {
+               this.lhs = lhs;
+               this.rhs = rhs;
+       }
+       
+       @Override
+       public Boolean evaluate(final ProvenanceEventRecord record) {
+               return lhs.evaluate(record) || rhs.evaluate(record);
+       }
+
+       @Override
+       public BooleanEvaluator negate() {
+               return new AndEvaluator(lhs.negate(), rhs.negate());
+       }
+
+       public BooleanEvaluator getLHS() {
+               return lhs;
+       }
+       
+       public BooleanEvaluator getRHS() {
+               return rhs;
+       }
+       
+       
+       public String toString() {
+               final StringBuilder sb = new StringBuilder();
+               if ( lhs instanceof AndEvaluator || lhs instanceof OrEvaluator 
) {
+                       sb.append("(").append(lhs.toString()).append(")");
+               } else {
+                       sb.append(lhs.toString());
+               }
+               
+               sb.append(" | ");
+               
+               if ( rhs instanceof AndEvaluator || rhs instanceof OrEvaluator 
) {
+                       sb.append("(").append(rhs.toString()).append(")");
+               } else {
+                       sb.append(rhs.toString());
+               }
+
+               return sb.toString();
+       }
+       
+       @Override
+       public int getEvaluatorType() {
+               return org.apache.nifi.pql.ProvenanceQueryParser.OR;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/CellValue.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/CellValue.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/CellValue.java
new file mode 100644
index 0000000..724e340
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/CellValue.java
@@ -0,0 +1,69 @@
+package org.apache.nifi.pql.evaluation.order;
+
+import java.util.Comparator;
+
+public class CellValue<T> implements Comparable<CellValue<T>> {
+       private final T value;
+       private final int rowId;
+       private final Comparator<T> valueComparator;
+       
+       public CellValue(final T value, final int rowId, final Comparator<T> 
valueComparator) {
+               this.value = value;
+               this.rowId = rowId;
+               this.valueComparator = valueComparator;
+       }
+
+       public T getValue() {
+               return value;
+       }
+
+       public int getRowId() {
+               return rowId;
+       }
+       
+       @Override
+       public int compareTo(final CellValue<T> other) {
+               if ( other == null ) {
+                       return 1;
+               }
+               
+               if ( this == other ) {
+                       return 0;
+               }
+               
+               return valueComparator.compare(value, other.value);
+       }
+       
+       @Override
+       public String toString() {
+               return value.toString();
+       }
+
+       @Override
+       public int hashCode() {
+               final int prime = 31;
+               int result = 1;
+               result = prime * result + ((value == null) ? 0 : 
value.hashCode());
+               return result;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (this == obj)
+                       return true;
+               if (obj == null)
+                       return false;
+               if (getClass() != obj.getClass())
+                       return false;
+               
+               @SuppressWarnings("rawtypes")
+               CellValue other = (CellValue) obj;
+               if (value == null) {
+                       if (other.value != null)
+                               return false;
+               } else if (!value.equals(other.value))
+                       return false;
+               return true;
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/FieldSorter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/FieldSorter.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/FieldSorter.java
new file mode 100644
index 0000000..b306b53
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/FieldSorter.java
@@ -0,0 +1,87 @@
+package org.apache.nifi.pql.evaluation.order;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.nifi.pql.evaluation.OperandEvaluator;
+import org.apache.nifi.pql.groups.Group;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public class FieldSorter implements RowSorter {
+
+       private final SortedSet<CellValue<ProvenanceEventRecord>> values = new 
TreeSet<>();
+       private final MultiFieldComparator comparator;
+       
+       public FieldSorter(final Map<OperandEvaluator<?>, SortDirection> 
fieldEvaluators) {
+               comparator = new MultiFieldComparator(fieldEvaluators);
+       }
+
+
+       @Override
+       public void add(final ProvenanceEventRecord record, final Group group, 
final int rowId) {
+               values.add(new CellValue<ProvenanceEventRecord>(record, rowId, 
comparator));
+       }
+
+       @Override
+       public List<Integer> sort() {
+               final List<Integer> rowIds = new ArrayList<>();
+               for ( final CellValue<?> value : values ) {
+                       rowIds.add( value.getRowId() );
+               }
+               return rowIds;
+       }
+       
+
+       
+       private static class MultiFieldComparator implements 
Comparator<ProvenanceEventRecord> {
+               private final Map<OperandEvaluator<?>, SortDirection> evals;
+               private final Comparator<Number> numberComparator = 
Sorters.newNumberComparator();
+               private final Comparator<Object> objectComparator = 
Sorters.newObjectComparator();
+               
+               public MultiFieldComparator(final Map<OperandEvaluator<?>, 
SortDirection> evals) {
+                       this.evals = evals;
+               }
+               
+               @Override
+               public int compare(final ProvenanceEventRecord r1, final 
ProvenanceEventRecord r2) {
+                       if ( r1 == r2 ) {
+                               return 0;
+                       }
+                       if (r1 == null && r2 == null) {
+                               return 0;
+                       }
+                       if (r1 == null) {
+                               return -1;
+                       }
+                       if (r2 == null) {
+                               return 1;
+                       }
+                       
+                       for ( final Map.Entry<OperandEvaluator<?>, 
SortDirection> entry : evals.entrySet() ) {
+                               final OperandEvaluator<?> eval = entry.getKey();
+                               final SortDirection dir = entry.getValue();
+                               
+                               int comparisonResult;
+                               
+                               final Object v1 = eval.evaluate(r1);
+                               final Object v2 = eval.evaluate(r2);
+                               
+                               if ( 
Number.class.isAssignableFrom(eval.getType()) ) {
+                                       comparisonResult = 
numberComparator.compare((Number) v1, (Number) v2);
+                               } else {
+                                       comparisonResult = 
objectComparator.compare(v1, v2);
+                               }
+                               
+                               if ( comparisonResult != 0 ) {
+                                       return dir == SortDirection.ASC ? 
comparisonResult : -comparisonResult;
+                               }
+                       }
+                       
+                       return 0;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/GroupedSorter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/GroupedSorter.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/GroupedSorter.java
new file mode 100644
index 0000000..b54416a
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/GroupedSorter.java
@@ -0,0 +1,123 @@
+package org.apache.nifi.pql.evaluation.order;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.pql.evaluation.Accumulator;
+import org.apache.nifi.pql.groups.Group;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public class GroupedSorter implements RowSorter {
+       private final Map<Accumulator<?>, SortDirection> accumulators;
+       private final Map<Group, Integer> firstGroupOccurrence = new 
HashMap<>();
+       private final Comparator<Group> comparator;
+
+       private final Set<CellValue<Group>> values = new HashSet<>();
+       
+       public GroupedSorter(final Map<Accumulator<?>, SortDirection> 
accumulators) {
+               this.accumulators = accumulators;
+               comparator = new GroupedComparator(accumulators);
+       }
+
+       @Override
+       public void add(final ProvenanceEventRecord record, final Group group, 
final int rowId) {
+               if ( !firstGroupOccurrence.containsKey(group) ) {
+                       firstGroupOccurrence.put(group, 
firstGroupOccurrence.size());
+               }
+               
+               for ( final Accumulator<?> accum : accumulators.keySet() ) {
+                       accum.accumulate(record, group);
+               }
+
+               values.add(new CellValue<Group>(group, 
firstGroupOccurrence.get(group), comparator));
+       }
+
+       @Override
+       public List<Integer> sort() {
+               final List<CellValue<Group>> sortedGroups = new ArrayList<>();
+               for ( final CellValue<Group> value : values ) {
+                       sortedGroups.add(value);
+               }
+               
+               Collections.sort(sortedGroups);
+               
+               final List<Integer> sorted = new ArrayList<>(values.size());
+               
+               for ( final CellValue<Group> value : sortedGroups ) {
+                       sorted.add( value.getRowId() );
+               }
+               
+               return sorted;
+       }
+
+       
+       private static class GroupedComparator implements Comparator<Group> {
+               private final Map<Accumulator<?>, SortDirection> map;
+               
+               public GroupedComparator(final Map<Accumulator<?>, 
SortDirection> map) {
+                       this.map = map;
+               }
+               
+               @Override
+               @SuppressWarnings({ "rawtypes", "unchecked" })
+               public int compare(final Group r1, final Group r2) {
+                       if ( r1 == r2 ) {
+                               return 0;
+                       }
+                       if (r1 == null && r2 == null) {
+                               return 0;
+                       }
+                       if (r1 == null) {
+                               return -1;
+                       }
+                       if (r2 == null) {
+                               return 1;
+                       }
+                       if ( r1.equals(r2) ) {
+                               return 0;
+                       }
+                       
+                       for ( final Map.Entry<Accumulator<?>, SortDirection> 
entry : map.entrySet() ) {
+                               final Accumulator<?> accumulator = 
entry.getKey();
+                               final SortDirection dir = entry.getValue();
+                               
+                               final List<Object> rowValues1 = (List<Object>) 
accumulator.getValues(r1);
+                               final List<Object> rowValues2 = (List<Object>) 
accumulator.getValues(r2);
+                               
+                               if ( rowValues1.size() > rowValues2.size() ) {
+                                       return -1;
+                               } else if ( rowValues2.size() > 
rowValues1.size() ) {
+                                       return 1;
+                               }
+                               
+                               for (int i=0; i < rowValues1.size(); i++) {
+                                       final Object v1 = rowValues1.get(i);
+                                       final Object v2 = rowValues2.get(i);
+                                       
+                                       int comparisonResult;
+                                       
+                                       if ( 
Number.class.isAssignableFrom(v1.getClass()) ) {
+                                               final Comparator comparator = 
Sorters.newNumberComparator();
+                                               comparisonResult = 
comparator.compare((Number) v1, (Number) v2);
+                                       } else {
+                                               final Comparator comparator = 
Sorters.newObjectComparator();
+                                               comparisonResult = 
comparator.compare(v1, v2);
+                                       }
+                                       
+                                       if ( comparisonResult != 0 ) {
+                                               return dir == SortDirection.ASC 
? comparisonResult : -comparisonResult;
+                                       }
+                               }
+                       }
+                       
+                       return 0;
+               }
+               
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/RowSorter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/RowSorter.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/RowSorter.java
new file mode 100644
index 0000000..a44d2bf
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/RowSorter.java
@@ -0,0 +1,13 @@
+package org.apache.nifi.pql.evaluation.order;
+
+import java.util.List;
+
+import org.apache.nifi.pql.groups.Group;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public interface RowSorter {
+
+       void add(ProvenanceEventRecord record, Group group, int rowId);
+       
+       List<Integer> sort();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/SortDirection.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/SortDirection.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/SortDirection.java
new file mode 100644
index 0000000..bf17bcc
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/SortDirection.java
@@ -0,0 +1,6 @@
+package org.apache.nifi.pql.evaluation.order;
+
+public enum SortDirection {
+       ASC,
+       DESC;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/Sorters.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/Sorters.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/Sorters.java
new file mode 100644
index 0000000..63cd6bc
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/Sorters.java
@@ -0,0 +1,74 @@
+package org.apache.nifi.pql.evaluation.order;
+
+import java.text.Collator;
+import java.util.Comparator;
+
+public class Sorters {
+
+       public static Comparator<Number> newNumberComparator() {
+               return new NumberComparator();
+       }
+       
+       public static Comparator<Object> newObjectComparator() {
+               return new ObjectComparator();
+       }
+       
+       
+       private static class NumberComparator implements Comparator<Number> {
+               @Override
+               public int compare(Number o1, Number o2) {
+                       if (o1 == o2) {
+                               return 0;
+                       }
+                       
+                       if (o1 == null && o2 == null) {
+                               return 0;
+                       }
+                       
+                       if (o1 == null) {
+                               return -1;
+                       }
+                       
+                       if (o2 == null) {
+                               return 1;
+                       }
+                       
+                       if (o1.doubleValue() < o2.doubleValue()) {
+                               return -1;
+                       }
+                       
+                       if (o1.doubleValue() > o2.doubleValue()) {
+                               return 1;
+                       }
+                       
+                       return 0;
+               }
+               
+       }
+       
+       private static class ObjectComparator implements Comparator<Object> {
+               private final Collator collator = Collator.getInstance();
+               
+               @Override
+               public int compare(final Object o1, final Object o2) {
+                       if ( o1 == o2 ) {
+                               return 0;
+                       }
+                       
+                       if (o1 == null && o2 == null) {
+                               return 0;
+                       }
+                       
+                       if (o1 == null) {
+                               return -1;
+                       }
+                       
+                       if (o2 == null) {
+                               return 1;
+                       }
+                       
+                       return collator.compare(o1.toString(), o2.toString());
+               }
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/repository/SelectAllRecords.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/repository/SelectAllRecords.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/repository/SelectAllRecords.java
new file mode 100644
index 0000000..e1f8df8
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/repository/SelectAllRecords.java
@@ -0,0 +1,61 @@
+package org.apache.nifi.pql.evaluation.repository;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.nifi.pql.evaluation.RepositoryEvaluator;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.StoredProvenanceEvent;
+
+public class SelectAllRecords implements RepositoryEvaluator {
+
+       public Iterator<StoredProvenanceEvent> evaluate(final 
ProvenanceEventRepository repository) throws IOException {
+               final int maxRecords = 10000;
+               
+               return new Iterator<StoredProvenanceEvent>() {
+                       long iterated = 0;
+                       long fetched = 0;
+                       
+                       List<StoredProvenanceEvent> records = null;
+                       Iterator<StoredProvenanceEvent> listItr = null;
+                       
+                       private void ensureIterator() {
+                               if ( listItr == null || !listItr.hasNext() ) {
+                                       try {
+                                               records = 
repository.getEvents(fetched, maxRecords);
+                                       } catch (final IOException ioe) {
+                                               throw new RuntimeException(ioe);
+                                       }
+                                       
+                                       listItr = records.iterator();
+                                       fetched += records.size();
+                               }
+                       }
+                       
+                       public boolean hasNext() {
+                               ensureIterator();
+                               return listItr.hasNext();
+                       }
+
+                       public StoredProvenanceEvent next() {
+                               if ( !hasNext() ) {
+                                       throw new NoSuchElementException();
+                               }
+                               
+                               if ( iterated++ == fetched ) {
+                                       records = null;
+                                       listItr = null;
+                               }
+                               
+                               return listItr.next();
+                       }
+
+                       public void remove() {
+                               throw new UnsupportedOperationException();
+                       }
+               };
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/exception/ProvenanceQueryLanguageException.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/exception/ProvenanceQueryLanguageException.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/exception/ProvenanceQueryLanguageException.java
new file mode 100644
index 0000000..87ca2c1
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/exception/ProvenanceQueryLanguageException.java
@@ -0,0 +1,23 @@
+package org.apache.nifi.pql.exception;
+
+public class ProvenanceQueryLanguageException extends RuntimeException {
+
+       private static final long serialVersionUID = 1L;
+
+       public ProvenanceQueryLanguageException() {
+               super();
+       }
+
+       public ProvenanceQueryLanguageException(final String message) {
+               super(message);
+       }
+
+       public ProvenanceQueryLanguageException(final Throwable cause) {
+               super(cause);
+       }
+
+       public ProvenanceQueryLanguageException(final String message, final 
Throwable cause) {
+               super(message, cause);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/exception/ProvenanceQueryLanguageParsingException.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/exception/ProvenanceQueryLanguageParsingException.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/exception/ProvenanceQueryLanguageParsingException.java
new file mode 100644
index 0000000..88426d3
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/exception/ProvenanceQueryLanguageParsingException.java
@@ -0,0 +1,23 @@
+package org.apache.nifi.pql.exception;
+
+public class ProvenanceQueryLanguageParsingException extends 
ProvenanceQueryLanguageException {
+
+       private static final long serialVersionUID = 1L;
+
+       public ProvenanceQueryLanguageParsingException() {
+               super();
+       }
+
+       public ProvenanceQueryLanguageParsingException(final String message) {
+               super(message);
+       }
+
+       public ProvenanceQueryLanguageParsingException(final Throwable cause) {
+               super(cause);
+       }
+
+       public ProvenanceQueryLanguageParsingException(final String message, 
final Throwable cause) {
+               super(message, cause);
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/groups/Group.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/groups/Group.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/groups/Group.java
new file mode 100644
index 0000000..a3e87c1
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/groups/Group.java
@@ -0,0 +1,57 @@
+package org.apache.nifi.pql.groups;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class Group {
+
+       private final List<Object> values;
+       private final int hashCode;
+       
+       public Group(final Object... values) {
+               this(Arrays.asList(values));
+       }
+       
+       public Group(final List<Object> values) {
+               this.values = new ArrayList<>(values);
+               
+               int prime = 23497;
+               int hc = 1;
+               for ( final Object o : values ) {
+                       hc = prime * hc + o.hashCode();
+               }
+               
+               this.hashCode = hc;
+       }
+
+       @Override
+       public int hashCode() {
+               return hashCode;
+       }
+
+       @Override
+       public boolean equals(final Object obj) {
+               if (this == obj)
+                       return true;
+               if (obj == null)
+                       return false;
+               if (getClass() != obj.getClass())
+                       return false;
+               
+               Group other = (Group) obj;
+               if (hashCode != other.hashCode)
+                       return false;
+               if (values == null) {
+                       if (other.values != null)
+                               return false;
+               } else if (!values.equals(other.values))
+                       return false;
+               return true;
+       }
+       
+       @Override
+       public String toString() {
+               return values == null ? "Default Group" : values.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/groups/Grouper.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/groups/Grouper.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/groups/Grouper.java
new file mode 100644
index 0000000..3ce667b
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/groups/Grouper.java
@@ -0,0 +1,20 @@
+package org.apache.nifi.pql.groups;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.pql.evaluation.RecordEvaluator;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+public class Grouper {
+
+       public static Group group(final ProvenanceEventRecord record, final 
List<RecordEvaluator<?>> evaluators) {
+               final List<Object> values = new ArrayList<>(evaluators.size());
+               for ( final RecordEvaluator<?> evaluator : evaluators ) {
+                       values.add(evaluator.evaluate(record));
+               }
+               
+               return new Group(values);
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/GroupingResultSet.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/GroupingResultSet.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/GroupingResultSet.java
new file mode 100644
index 0000000..53c4dca
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/GroupingResultSet.java
@@ -0,0 +1,161 @@
+package org.apache.nifi.pql.results;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import org.apache.nifi.pql.evaluation.Accumulator;
+import org.apache.nifi.pql.evaluation.RecordEvaluator;
+import org.apache.nifi.pql.evaluation.order.RowSorter;
+import org.apache.nifi.pql.groups.Group;
+import org.apache.nifi.pql.groups.Grouper;
+import org.apache.nifi.provenance.StoredProvenanceEvent;
+import org.apache.nifi.provenance.query.ProvenanceResultSet;
+
+public class GroupingResultSet implements ProvenanceResultSet {
+       private final List<String> labels;
+       private final List<Class<?>> returnTypes;
+       private final Iterator<? extends StoredProvenanceEvent> recordItr;
+       private final List<Accumulator<?>> selectAccumulators;
+       private final RecordEvaluator<Boolean> sourceEvaluator;
+       private final RecordEvaluator<Boolean> conditionEvaluator;
+       private final List<RecordEvaluator<?>> groupEvaluators;
+       private final RowSorter sorter;
+       private final Long limit;
+       private long recordsReturned = 0L;
+       
+       private Iterator<List<Object>> rowItr;
+       
+       public GroupingResultSet(
+                       final Iterator<? extends StoredProvenanceEvent> 
recordItr,
+                       final List<Accumulator<?>> selectAccumulators,
+                       final RecordEvaluator<Boolean> sourceEvaluator, 
+                       final RecordEvaluator<Boolean> conditionEvaluator, 
+                       final List<String> labels, final List<Class<?>> 
returnTypes, final List<RecordEvaluator<?>> groupEvaluators, 
+                       final RowSorter sorter,
+                       final Long limit) {
+               
+               this.labels = labels;
+               this.returnTypes = returnTypes;
+               this.recordItr = recordItr;
+               this.selectAccumulators = selectAccumulators;
+               this.sourceEvaluator = sourceEvaluator;
+               this.conditionEvaluator = conditionEvaluator;
+               this.groupEvaluators = groupEvaluators;
+               this.sorter = sorter;
+               this.limit = limit;
+       }
+       
+
+       @Override
+       public List<String> getLabels() {
+               return labels;
+       }
+
+       @Override
+       public List<Class<?>> getReturnType() {
+               return returnTypes;
+       }
+
+       @SuppressWarnings({ "rawtypes", "unchecked" })
+       private void createRowItr() {
+           int recordIdx = 0;
+        while (recordItr.hasNext()) {
+            final StoredProvenanceEvent record = recordItr.next();
+            
+            if ( sourceEvaluator != null && !sourceEvaluator.evaluate(record) 
) {
+                continue;
+            }
+            
+            final boolean meetsConditions = conditionEvaluator == null ? true 
: conditionEvaluator.evaluate(record);
+            if ( meetsConditions ) {
+                final Group group = groupEvaluators == null ? null : 
Grouper.group(record, groupEvaluators);
+                
+                for ( final Accumulator<?> accumulator : selectAccumulators ) {
+                    accumulator.accumulate(record, group);
+                }
+                
+                if ( sorter != null ) {
+                    sorter.add(record, group, recordIdx++);
+                }
+            }
+        }
+        
+        // Key = Group
+        // Value = Map
+        //          Key = Accumulator
+        //          Value = Column values for a row
+        final Map<Group, Map<Accumulator, List<Object>>> groupedMap = new 
LinkedHashMap<>();
+        for ( final Accumulator<?> accumulator : selectAccumulators ) {
+            final Map<Group, List<Object>> accumulatedValues = (Map) 
accumulator.getValues();
+            
+            // for each row returned by this accumulator...
+            for ( final Map.Entry<Group, List<Object>> entry : 
accumulatedValues.entrySet() ) {
+                final Group group = entry.getKey();
+                
+                Map<Accumulator, List<Object>> accumulatorRows = 
groupedMap.get(group);
+                if ( accumulatorRows == null ) {
+                    accumulatorRows = new LinkedHashMap<>();
+                    groupedMap.put(group, accumulatorRows);
+                }
+                accumulatorRows.put(accumulator, accumulatedValues.get(group));
+            }
+        }
+        
+        final Collection<Map<Accumulator, List<Object>>> columnCollection = 
groupedMap.values();
+        final List<List<Object>> rows = new ArrayList<>();
+        for ( final Map<Accumulator, List<Object>> map : columnCollection ) {
+            final List<Object> columnValues = new ArrayList<>();
+            
+            int rowIdx = 0;
+            for ( final List<Object> accumulatorRows : map.values() ) {
+                if (accumulatorRows.size() <= rowIdx) {
+                    break;
+                }
+                
+                final Object columnVal = accumulatorRows.get(rowIdx);
+                columnValues.add(columnVal);
+            }
+            
+            rowIdx++;
+            rows.add(columnValues);
+        }
+
+        final List<List<Object>> sortedRows;
+        if ( sorter == null ) {
+            sortedRows = rows;
+        } else {
+            sortedRows = new ArrayList<>(rows.size());
+            
+            final List<Integer> sortedRowIds = sorter.sort();
+            for (final Integer rowId : sortedRowIds) {
+                sortedRows.add( rows.get(rowId) );
+            }
+        }
+        
+        rowItr = sortedRows.iterator();
+       }
+       
+       
+       @Override
+       public boolean hasNext() {
+           if ( rowItr == null ) {
+               createRowItr();
+           }
+               return (limit == null || recordsReturned <= limit ) && 
rowItr.hasNext();
+       }
+
+       @Override
+       public List<?> next() {
+           if ( hasNext() ) {
+               return rowItr.next();
+           }
+           
+           throw new NoSuchElementException();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/OrderedResultSet.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/OrderedResultSet.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/OrderedResultSet.java
new file mode 100644
index 0000000..c5417c5
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/OrderedResultSet.java
@@ -0,0 +1,50 @@
+package org.apache.nifi.pql.results;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.nifi.pql.evaluation.order.RowSorter;
+import org.apache.nifi.provenance.query.ProvenanceResultSet;
+
+public class OrderedResultSet implements ProvenanceResultSet {
+       private final Iterator<List<?>> sortedRowItr;
+       private final ProvenanceResultSet rs;
+       
+       public OrderedResultSet(final ProvenanceResultSet rs, final RowSorter 
sorter) {
+               this.rs = rs;
+               final List<List<?>> rows = new ArrayList<>();
+               
+               while (rs.hasNext()) {
+                       final List<?> colVals = rs.next();
+                       rows.add(colVals);
+               }
+               
+               final List<List<?>> sortedRows = new ArrayList<>(rows.size());
+               for ( final Integer rowId : sorter.sort() ) {
+                       sortedRows.add(rows.get(rowId.intValue()));
+               }
+               
+               sortedRowItr = sortedRows.iterator();
+       }
+       
+       @Override
+       public List<String> getLabels() {
+               return rs.getLabels();
+       }
+       
+       @Override
+       public List<Class<?>> getReturnType() {
+               return rs.getReturnType();
+       }
+       
+       @Override
+       public boolean hasNext() {
+               return sortedRowItr.hasNext();
+       }
+       
+       @Override
+       public List<?> next() {
+               return sortedRowItr.next();
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/ResultRow.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/ResultRow.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/ResultRow.java
new file mode 100644
index 0000000..f02a992
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/ResultRow.java
@@ -0,0 +1,20 @@
+package org.apache.nifi.pql.results;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class ResultRow {
+       private final List<Object> values;
+       
+       public ResultRow(final Object... values) {
+               this(Arrays.asList(values));
+       }
+       
+       public ResultRow(final List<Object> values) {
+               this.values = values;
+       }
+       
+       public List<Object> getValues() {
+               return values;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/RowIterator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/RowIterator.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/RowIterator.java
new file mode 100644
index 0000000..a9202dc
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/RowIterator.java
@@ -0,0 +1,69 @@
+package org.apache.nifi.pql.results;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class RowIterator implements Iterator<ResultRow> {
+       private final Iterator<Iterator<?>> itrs;
+       
+       public RowIterator(final List<Iterator<?>> itrs) {
+               this.itrs = itrs.iterator();
+       }
+       
+       
+       @Override
+       public boolean hasNext() {
+               return itrs.hasNext();
+       }
+       
+       public ResultRow next() {
+               final Iterator<?> columnValueItr = itrs.next();
+               final List<Object> colValues = new ArrayList<>();
+               while (columnValueItr.hasNext()) {
+                       colValues.add(columnValueItr.next());
+               }
+               return new ResultRow(colValues);
+       }
+       
+//     @Override
+//     public boolean hasNext() {
+//             if ( curItr == null || !curItr.hasNext() ) {
+//                     while (itrs.hasNext()) {
+//                             curItr = itrs.next();
+//                             if ( curItr.hasNext() ) {
+//                                     return true;
+//                             }
+//                     }
+//                     
+//                     return false;
+//             }
+//             
+//             return true;
+//     }
+//
+//     @Override
+//     public T next() {
+//             if ( curItr.hasNext() ) {
+//                     return curItr.next();
+//             }
+//             
+//             if ( curItr == null || !curItr.hasNext() ) {
+//                     while (itrs.hasNext()) {
+//                             curItr = itrs.next();
+//                             if ( curItr.hasNext() ) {
+//                                     return curItr.next();
+//                             }
+//                     }
+//                     
+//                     throw new NoSuchElementException();
+//             } else {
+//                     return curItr.next();
+//             }
+//     }
+
+       @Override
+       public void remove() {
+               throw new UnsupportedOperationException();
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/StandardOrderedResultSet.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/StandardOrderedResultSet.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/StandardOrderedResultSet.java
new file mode 100644
index 0000000..4bec4c0
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/StandardOrderedResultSet.java
@@ -0,0 +1,111 @@
+package org.apache.nifi.pql.results;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.nifi.pql.evaluation.Accumulator;
+import org.apache.nifi.pql.evaluation.RecordEvaluator;
+import org.apache.nifi.pql.evaluation.order.RowSorter;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.StoredProvenanceEvent;
+import org.apache.nifi.provenance.query.ProvenanceResultSet;
+
+public class StandardOrderedResultSet implements ProvenanceResultSet {
+       private final List<String> labels;
+       private final List<Class<?>> returnTypes;
+       
+       private final Iterator<? extends StoredProvenanceEvent> recordItr;
+       private final List<Accumulator<?>> selectAccumulators;
+       private final RecordEvaluator<Boolean> sourceEvaluator;
+       private final RecordEvaluator<Boolean> conditionEvaluator;
+       private final RowSorter sorter;
+       private final Long limit;
+       
+       private Iterator<ResultRow> resultRowItr;
+
+       public StandardOrderedResultSet(final Iterator<? extends 
StoredProvenanceEvent> recordItr,
+                       final List<Accumulator<?>> selectAccumulators,
+                       final RecordEvaluator<Boolean> sourceEvaluator, 
+                       final RecordEvaluator<Boolean> conditionEvaluator, 
+                       final List<String> labels, 
+                       final List<Class<?>> returnTypes,
+                       final RowSorter sorter,
+                       final Long limit)
+       {
+               this.labels = labels;
+               this.returnTypes = returnTypes;
+               
+               this.recordItr = recordItr;
+               this.selectAccumulators = selectAccumulators;
+               this.sourceEvaluator = sourceEvaluator;
+               this.conditionEvaluator = conditionEvaluator;
+               this.sorter = sorter;
+               this.limit = limit;
+       }
+       
+       
+       @Override
+       public List<String> getLabels() {
+               return labels;
+       }
+
+       @Override
+       public List<Class<?>> getReturnType() {
+               return returnTypes;
+       }
+
+       private void createResultRowItr() {
+           final List<ResultRow> rows = new ArrayList<>();
+        int idx = 0;
+        while (recordItr.hasNext()) {
+            final ProvenanceEventRecord record = recordItr.next();
+            if ( sourceEvaluator != null && !sourceEvaluator.evaluate(record) 
) {
+                continue;
+            }
+            
+            final boolean meetsConditions = conditionEvaluator == null ? true 
: conditionEvaluator.evaluate(record);
+            if ( meetsConditions ) {
+                final List<Object> values = new 
ArrayList<>(selectAccumulators.size());
+                for ( final Accumulator<?> accumulator : selectAccumulators ) {
+                    final Object value = accumulator.accumulate(record, null);
+                    accumulator.reset();
+                    values.add(value);
+                }
+                rows.add(new ResultRow(values));
+                sorter.add(record, null, idx++);
+            }
+        }
+        
+        final List<ResultRow> sortedRows = new ArrayList<>();
+        for ( final Integer unsortedIndex : sorter.sort() ) {
+            final ResultRow row = rows.get(unsortedIndex.intValue());
+            sortedRows.add(row);
+            
+            if ( limit != null && sortedRows.size() >= limit.intValue() ) {
+                break;
+            }
+        }
+        
+        resultRowItr = sortedRows.iterator();
+       }
+       
+       @Override
+       public boolean hasNext() {
+           if ( resultRowItr == null ) {
+               createResultRowItr();
+           }
+           
+               return resultRowItr.hasNext();
+       }
+
+       @Override
+       public List<?> next() {
+           if ( resultRowItr == null ) {
+            createResultRowItr();
+        }
+           
+               return resultRowItr.next().getValues();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/StandardUnorderedResultSet.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/StandardUnorderedResultSet.java
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/StandardUnorderedResultSet.java
new file mode 100644
index 0000000..019ab5c
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/StandardUnorderedResultSet.java
@@ -0,0 +1,104 @@
+package org.apache.nifi.pql.results;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.nifi.pql.evaluation.Accumulator;
+import org.apache.nifi.pql.evaluation.RecordEvaluator;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.StoredProvenanceEvent;
+import org.apache.nifi.provenance.query.ProvenanceResultSet;
+
+public class StandardUnorderedResultSet implements ProvenanceResultSet {
+
+       private final List<String> labels;
+       private final List<Class<?>> returnTypes;
+       private final Iterator<? extends StoredProvenanceEvent> recordItr;
+       private final RecordEvaluator<Boolean> sourceEvaluator;
+       private final RecordEvaluator<Boolean> conditionEvaluator;
+       private final List<Accumulator<?>> selectAccumulators;
+       private final Long limit;
+       
+       private ResultRow nextRecord;
+       private long recordsReturned = 0L;
+       
+       public StandardUnorderedResultSet(final Iterator<? extends 
StoredProvenanceEvent> recordItr,
+                       final List<Accumulator<?>> selectAccumulators,
+                       final RecordEvaluator<Boolean> sourceEvaluator, 
+                       final RecordEvaluator<Boolean> conditionEvaluator, 
+                       final List<String> labels, 
+                       final List<Class<?>> returnTypes,
+                       final Long limit)
+       {
+               this.selectAccumulators = selectAccumulators;
+               this.labels = labels;
+               this.returnTypes = returnTypes;
+               this.recordItr = recordItr;
+               this.sourceEvaluator = sourceEvaluator;
+               this.conditionEvaluator = conditionEvaluator;
+               this.limit = limit;
+       }
+       
+       
+       @Override
+       public List<String> getLabels() {
+               return labels;
+       }
+
+       @Override
+       public List<Class<?>> getReturnType() {
+               return returnTypes;
+       }
+
+       private boolean findNextRecord() {
+               if ( limit != null && recordsReturned >= limit.longValue() ) {
+                       return false;
+               }
+               
+               while (recordItr.hasNext()) {
+               final ProvenanceEventRecord record = recordItr.next();
+               
+               if ( sourceEvaluator != null && 
!sourceEvaluator.evaluate(record) ) {
+                       continue;
+               }
+               
+               final boolean meetsConditions = conditionEvaluator == null ? 
true : conditionEvaluator.evaluate(record);
+               if ( meetsConditions ) {
+                       final List<Object> values = new 
ArrayList<>(selectAccumulators.size());
+                       for ( final Accumulator<?> accumulator : 
selectAccumulators ) {
+                               final Object value = 
accumulator.accumulate(record, null);
+                               accumulator.reset();
+                               values.add(value);
+                       }
+                       this.nextRecord = new ResultRow(values);
+                       recordsReturned++;
+                       return true;
+               }
+       }
+               
+               return false;
+       }
+       
+       @Override
+       public boolean hasNext() {
+               if ( nextRecord != null ) {
+                       return true;
+               }
+               
+               return findNextRecord();
+       }
+
+       @Override
+       public List<?> next() {
+               if ( hasNext() ) {
+                       final List<?> value = nextRecord.getValues();
+                       nextRecord = null;
+                       return value;
+               }
+               
+               throw new NoSuchElementException();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/resources/docs/examples.pql
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/resources/docs/examples.pql
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/resources/docs/examples.pql
new file mode 100644
index 0000000..1f0b12c
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/resources/docs/examples.pql
@@ -0,0 +1,48 @@
+# Select all events that occurred in the last hour
+SELECT Event
+FROM *
+WHERE
+       Event.Time > 1 HOUR AGO
+
+
+# Count the number of RECEIVE events from myHost.myDomain in the last hour
+SELECT COUNT(*)
+FROM RECEIVE
+WHERE
+       Event.Time WITHIN 1 HOUR
+       AND
+       Event.TransitUri MATCHES '.*myHost\.myDomain.*'
+       
+       
+# Count the number of bytes received from myHost.myDomain in the last hour
+SELECT SUM(FileSize)
+FROM RECEIVE
+WHERE
+       Event.Time WITHIN 1 HOUR
+       AND
+       Event.TransitUri MATCHES '.*myHost\.MyDomain.*'
+
+
+# Count the number of objects and number of bytes sent to each host on August 
1, 2014, but only for the 10 hosts that we send to the most
+SELECT TransitUri, COUNT(*) as numFiles, SUM(FileSize) as fileSize
+FROM SEND
+WHERE
+       Event.Time BETWEEN '08/01/2014 00:00:00.000' AND '08/01/2014 
23:59:59.999'
+GROUP BY
+       Event.TransitUri
+ORDER BY
+       fileSize
+LIMIT 10
+
+
+
+
+# Select the filename and transit URI of any SEND event that occurred as the 
result of receiving a filename that ends with '.txt' in the last hour:
+DEFINE LINEAGE L:
+       RECEIVE R{
+               R['filename'] MATCHES '.*\.txt'
+               AND
+               R.Time > 1 HOUR AGO
+       } --> SEND S 
+SELECT S.Time, S['filename'], S.TransitUri 
+FROM LINEAGE L

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/resources/docs/implementation-notes.txt
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-provenance-query-language/src/main/resources/docs/implementation-notes.txt
 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/resources/docs/implementation-notes.txt
new file mode 100644
index 0000000..e75786f
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-provenance-query-language/src/main/resources/docs/implementation-notes.txt
@@ -0,0 +1,52 @@
+ORDER BY
+       - Very Difficult.
+       - Does not have anything to do with what is selected
+       - Need to do an External Sort:
+               - Break records into groups of 10,000.
+               - Sort each group of 10,000 records.
+               - Write out those 10,000 records.
+               - Read in the first X number of records where X = 10,000 / 
number of files
+               - Merge the heads of the records.
+               - Algorithm is on Wikipedia. "External Sort"
+               - Similar to what we do in WALI when we order the Transactions 
by Transaction ID across multiple partitions.
+               
+
+GROUP BY
+       - Implement in the Accumulator. May make sense to break Accumulator 
into two Interfaces:
+       
+       GroupingAccumulator:
+               T accumulate(ProvenanceEventRecord record, Group group)
+       UngroupedAccumulator:
+               T accumulate(ProvenanceEventRecord record)
+               
+       Then the GroupingAccumulator will simply map a group to the appropriate 
UngroupedAccumulator and then call #accumulate. 
+       UngroupedAccumulator will never be used except for the 
GroupingAccumulator delegating to the appropriate UngroupedAccumulator.
+
+
+WHERE
+       - All functions must be able to be done in Lucene.
+       
+EventAccumulator
+       - Should store provenance event location instead of event. Regardless 
of whether a field was selected or the entire event.
+       
+
+       
+       
+       
+       
+       
+       
+Prov Repo:
+       - Allow ANDs and ORs in queries?
+       - ProvenanceEventRecord should return Location object. Location is a 
marker interface and the specific implementation will
+         to be used will depend on the repo. For example, 
VolatileProvenanceRepository would return something like "int getIndex()" and 
"long getId()"
+         so that we can get the event at the specified index and return null 
unless that event's id is equal to the result of calling 'getId()'.
+         Persistent Prov Repo would return a Location that includes filename & 
offset. Perhaps also a record index so that we can add multiple
+         records to a single repo update (byte offset of 'transaction' is 1000 
and record offset into transaction is 4). This would be used
+         so that if we do an update with 100 records and all have similar 
fields (component type, component id, most previous attributes?), then we
+         can write that out once. This should probably be a new data structure 
that wraps a ProvenanceEventRecord:
+               StoredProvenanceEvent
+                       ProvenanceEventRecord getEventRecord()
+                       Location getLocation()
+       - Index all attributes and properties always? At least allow property 
value to be "*" to indicate all.
+       
\ No newline at end of file

Reply via email to