http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/MatchesEvaluator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/MatchesEvaluator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/MatchesEvaluator.java new file mode 100644 index 0000000..b646129 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/MatchesEvaluator.java @@ -0,0 +1,72 @@ +package org.apache.nifi.pql.evaluation.comparison; + +import java.util.regex.Pattern; + +import org.apache.nifi.pql.evaluation.BooleanEvaluator; +import org.apache.nifi.pql.evaluation.OperandEvaluator; +import org.apache.nifi.pql.evaluation.literals.StringLiteralEvaluator; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public class MatchesEvaluator implements BooleanEvaluator { + + private final OperandEvaluator<?> lhs; + private final OperandEvaluator<?> rhs; + private final boolean negated; + private final Pattern pattern; + + public MatchesEvaluator(final OperandEvaluator<?> lhs, final OperandEvaluator<?> rhs) { + this(lhs, rhs, false); + } + + + public MatchesEvaluator(final OperandEvaluator<?> lhs, final OperandEvaluator<?> rhs, final boolean negated) { + this.lhs = lhs; + this.rhs = rhs; + this.negated = negated; + + if ( rhs instanceof StringLiteralEvaluator ) { + pattern = Pattern.compile(rhs.evaluate(null).toString()); + } else { + pattern = null; + } + } + + public OperandEvaluator<?> getLHS() { + return lhs; + } + + public OperandEvaluator<?> getRHS() { + return rhs; + } + + public Boolean evaluate(final ProvenanceEventRecord record) { + Object lhsValue = lhs.evaluate(record); + Object rhsValue = rhs.evaluate(record); + + if ( lhsValue == null || rhsValue == null ) { + return false; + } + + final String lhsString = lhsValue.toString(); + + final Pattern compiled; + if ( pattern == null ) { + compiled = Pattern.compile(rhsValue.toString()); + } else { + compiled = pattern; + } + + final boolean matches = compiled.matcher(lhsString).matches(); + return negated ? !matches : matches; + } + + public MatchesEvaluator negate() { + return new MatchesEvaluator(lhs, rhs, !negated); + } + + @Override + public int getEvaluatorType() { + return org.apache.nifi.pql.ProvenanceQueryParser.MATCHES; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/RecordTypeEvaluator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/RecordTypeEvaluator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/RecordTypeEvaluator.java new file mode 100644 index 0000000..d267371 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/RecordTypeEvaluator.java @@ -0,0 +1,34 @@ +package org.apache.nifi.pql.evaluation.comparison; + +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Set; + +import org.apache.nifi.pql.evaluation.BooleanEvaluator; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; + +public class RecordTypeEvaluator implements BooleanEvaluator { + private final Set<ProvenanceEventType> types; + + public RecordTypeEvaluator(final Set<ProvenanceEventType> types) { + this.types = new HashSet<>(types); + } + + @Override + public Boolean evaluate(final ProvenanceEventRecord record) { + return types.contains(record.getEventType()); + } + + @Override + public BooleanEvaluator negate() { + final Set<ProvenanceEventType> negatedTypes = EnumSet.complementOf(EnumSet.copyOf(types)); + return new RecordTypeEvaluator(negatedTypes); + } + + @Override + public int getEvaluatorType() { + return org.apache.nifi.pql.ProvenanceQueryParser.TYPE; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/StartsWithEvaluator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/StartsWithEvaluator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/StartsWithEvaluator.java new file mode 100644 index 0000000..7c11416 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/StartsWithEvaluator.java @@ -0,0 +1,57 @@ +package org.apache.nifi.pql.evaluation.comparison; + +import org.apache.nifi.pql.evaluation.BooleanEvaluator; +import org.apache.nifi.pql.evaluation.OperandEvaluator; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public class StartsWithEvaluator implements BooleanEvaluator { + + private final boolean negated; + private final OperandEvaluator<?> lhs; + private final OperandEvaluator<?> rhs; + + public StartsWithEvaluator(final OperandEvaluator<?> lhs, final OperandEvaluator<?> rhs) { + this(lhs, rhs, false); + } + + public StartsWithEvaluator(final OperandEvaluator<?> lhs, final OperandEvaluator<?> rhs, final boolean negated) { + this.lhs = lhs; + this.rhs = rhs; + this.negated = negated; + } + + @Override + public Boolean evaluate(final ProvenanceEventRecord record) { + final Object lhsValue = lhs.evaluate(record); + final Object rhsValue = rhs.evaluate(record); + + if ( lhsValue == null || rhsValue == null ) { + return false; + } + + final String lhsString = lhsValue.toString(); + final String rhsString = rhsValue.toString(); + + final boolean startsWith = lhsString.startsWith(rhsString); + return negated ? !startsWith : startsWith; + } + + public OperandEvaluator<?> getLHS() { + return lhs; + } + + public OperandEvaluator<?> getRHS() { + return rhs; + } + + @Override + public BooleanEvaluator negate() { + return new StartsWithEvaluator(lhs, rhs, !negated); + } + + @Override + public int getEvaluatorType() { + return org.apache.nifi.pql.ProvenanceQueryParser.STARTS_WITH; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/conversion/DateToLongEvaluator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/conversion/DateToLongEvaluator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/conversion/DateToLongEvaluator.java new file mode 100644 index 0000000..6832e14 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/conversion/DateToLongEvaluator.java @@ -0,0 +1,27 @@ +package org.apache.nifi.pql.evaluation.conversion; + +import org.apache.nifi.pql.evaluation.OperandEvaluator; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public class DateToLongEvaluator implements OperandEvaluator<Long> { + + + @Override + public Long evaluate(ProvenanceEventRecord record) { + // TODO Auto-generated method stub + return null; + } + + @Override + public int getEvaluatorType() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public Class<Long> getType() { + // TODO Auto-generated method stub + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/AttributeEvaluator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/AttributeEvaluator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/AttributeEvaluator.java new file mode 100644 index 0000000..c143c4a --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/AttributeEvaluator.java @@ -0,0 +1,42 @@ +package org.apache.nifi.pql.evaluation.extraction; + +import org.apache.nifi.pql.evaluation.OperandEvaluator; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public class AttributeEvaluator implements OperandEvaluator<String> { + private final OperandEvaluator<String> attributeNameEvaluator; + + public AttributeEvaluator(final OperandEvaluator<String> attributeNameEvaluator) { + this.attributeNameEvaluator = attributeNameEvaluator; + } + + public OperandEvaluator<String> getAttributeNameEvaluator() { + return attributeNameEvaluator; + } + + @Override + public String evaluate(final ProvenanceEventRecord record) { + final String attributeName = attributeNameEvaluator.evaluate(record); + if ( attributeName == null ) { + return null; + } + + return record.getAttribute(attributeName); + } + + + @Override + public Class<String> getType() { + return String.class; + } + + @Override + public String toString() { + return attributeNameEvaluator.toString(); + } + + @Override + public int getEvaluatorType() { + return org.apache.nifi.pql.ProvenanceQueryParser.ATTRIBUTE; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/ComponentIdEvaluator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/ComponentIdEvaluator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/ComponentIdEvaluator.java new file mode 100644 index 0000000..50850c0 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/ComponentIdEvaluator.java @@ -0,0 +1,23 @@ +package org.apache.nifi.pql.evaluation.extraction; + +import org.apache.nifi.pql.evaluation.OperandEvaluator; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public class ComponentIdEvaluator implements OperandEvaluator<String> { + + @Override + public String evaluate(final ProvenanceEventRecord record) { + return record.getComponentId(); + } + + @Override + public Class<String> getType() { + return String.class; + } + + @Override + public int getEvaluatorType() { + return org.apache.nifi.pql.ProvenanceQueryParser.COMPONENT_ID; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/RelationshipEvaluator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/RelationshipEvaluator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/RelationshipEvaluator.java new file mode 100644 index 0000000..a6ff59d --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/RelationshipEvaluator.java @@ -0,0 +1,23 @@ +package org.apache.nifi.pql.evaluation.extraction; + +import org.apache.nifi.pql.evaluation.OperandEvaluator; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public class RelationshipEvaluator implements OperandEvaluator<String> { + + @Override + public String evaluate(final ProvenanceEventRecord record) { + return record.getRelationship(); + } + + @Override + public Class<String> getType() { + return String.class; + } + + @Override + public int getEvaluatorType() { + return org.apache.nifi.pql.ProvenanceQueryParser.RELATIONSHIP; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/SizeEvaluator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/SizeEvaluator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/SizeEvaluator.java new file mode 100644 index 0000000..052b88e --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/SizeEvaluator.java @@ -0,0 +1,23 @@ +package org.apache.nifi.pql.evaluation.extraction; + +import org.apache.nifi.pql.evaluation.OperandEvaluator; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public class SizeEvaluator implements OperandEvaluator<Long> { + + @Override + public Long evaluate(final ProvenanceEventRecord record) { + return record.getFileSize(); + } + + @Override + public Class<Long> getType() { + return Long.class; + } + + @Override + public int getEvaluatorType() { + return org.apache.nifi.pql.ProvenanceQueryParser.FILESIZE; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/TimestampEvaluator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/TimestampEvaluator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/TimestampEvaluator.java new file mode 100644 index 0000000..fcc1ea1 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/TimestampEvaluator.java @@ -0,0 +1,26 @@ +package org.apache.nifi.pql.evaluation.extraction; + +import org.apache.nifi.pql.evaluation.OperandEvaluator; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public class TimestampEvaluator implements OperandEvaluator<Long> { + + @Override + public Long evaluate(final ProvenanceEventRecord record) { + if ( record == null ) { + return null; + } + return record.getEventTime(); + } + + @Override + public Class<Long> getType() { + return Long.class; + } + + @Override + public int getEvaluatorType() { + return org.apache.nifi.pql.ProvenanceQueryParser.TIMESTAMP; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/TransitUriEvaluator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/TransitUriEvaluator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/TransitUriEvaluator.java new file mode 100644 index 0000000..35e4a39 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/TransitUriEvaluator.java @@ -0,0 +1,23 @@ +package org.apache.nifi.pql.evaluation.extraction; + +import org.apache.nifi.pql.evaluation.OperandEvaluator; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public class TransitUriEvaluator implements OperandEvaluator<String> { + + @Override + public String evaluate(final ProvenanceEventRecord record) { + return record.getTransitUri(); + } + + @Override + public Class<String> getType() { + return String.class; + } + + @Override + public int getEvaluatorType() { + return org.apache.nifi.pql.ProvenanceQueryParser.TRANSIT_URI; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/TypeEvaluator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/TypeEvaluator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/TypeEvaluator.java new file mode 100644 index 0000000..2550bc3 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/TypeEvaluator.java @@ -0,0 +1,24 @@ +package org.apache.nifi.pql.evaluation.extraction; + +import org.apache.nifi.pql.evaluation.OperandEvaluator; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; + +public class TypeEvaluator implements OperandEvaluator<ProvenanceEventType> { + + @Override + public ProvenanceEventType evaluate(final ProvenanceEventRecord record) { + return record.getEventType(); + } + + @Override + public Class<ProvenanceEventType> getType() { + return ProvenanceEventType.class; + } + + @Override + public int getEvaluatorType() { + return org.apache.nifi.pql.ProvenanceQueryParser.TYPE; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/UuidEvaluator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/UuidEvaluator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/UuidEvaluator.java new file mode 100644 index 0000000..39d8243 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/UuidEvaluator.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pql.evaluation.extraction; + +import org.apache.nifi.pql.evaluation.OperandEvaluator; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public class UuidEvaluator implements OperandEvaluator<String> { + + @Override + public String evaluate(final ProvenanceEventRecord record) { + return record.getFlowFileUuid(); + } + + @Override + public Class<String> getType() { + return String.class; + } + + @Override + public int getEvaluatorType() { + return org.apache.nifi.pql.ProvenanceQueryParser.UUID; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/function/TimeFieldEvaluator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/function/TimeFieldEvaluator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/function/TimeFieldEvaluator.java new file mode 100644 index 0000000..44450c9 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/function/TimeFieldEvaluator.java @@ -0,0 +1,60 @@ +package org.apache.nifi.pql.evaluation.function; + +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; +import java.util.TimeZone; + +import org.apache.nifi.pql.evaluation.OperandEvaluator; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public class TimeFieldEvaluator implements OperandEvaluator<Long> { + private final OperandEvaluator<Long> timeExtractor; + private final int evaluatorType; + + private final List<Integer> fieldsToClear = new ArrayList<>(); + + public TimeFieldEvaluator(final OperandEvaluator<Long> timeExtractor, final int timeField, final int evaluatorType) { + this.timeExtractor = timeExtractor; + this.evaluatorType = evaluatorType; + + switch (timeField) { + case Calendar.YEAR: + fieldsToClear.add(Calendar.MONTH); + case Calendar.DAY_OF_MONTH: + fieldsToClear.add(Calendar.HOUR); + case Calendar.HOUR: + fieldsToClear.add(Calendar.MINUTE); + case Calendar.MINUTE: + fieldsToClear.add(Calendar.SECOND); + default: + fieldsToClear.add(Calendar.MILLISECOND); + } + } + + @Override + public Long evaluate(ProvenanceEventRecord record) { + final Long epochMillis = timeExtractor.evaluate(record); + if ( epochMillis == null ) { + return null; + } + + final Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + cal.setTimeInMillis(epochMillis); + for ( final Integer field : fieldsToClear ) { + cal.set(field, 0); + } + return Long.valueOf(cal.getTimeInMillis()); + } + + @Override + public Class<Long> getType() { + return Long.class; + } + + @Override + public int getEvaluatorType() { + return evaluatorType; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/literals/LongLiteralEvaluator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/literals/LongLiteralEvaluator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/literals/LongLiteralEvaluator.java new file mode 100644 index 0000000..ee63c48 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/literals/LongLiteralEvaluator.java @@ -0,0 +1,27 @@ +package org.apache.nifi.pql.evaluation.literals; + +import org.apache.nifi.pql.evaluation.OperandEvaluator; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public class LongLiteralEvaluator implements OperandEvaluator<Long> { + private final Long value; + + public LongLiteralEvaluator(final Long value) { + this.value = value; + } + + @Override + public Long evaluate(final ProvenanceEventRecord record) { + return value; + } + + @Override + public Class<Long> getType() { + return Long.class; + } + + @Override + public int getEvaluatorType() { + return org.apache.nifi.pql.ProvenanceQueryParser.NUMBER; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/literals/StringLiteralEvaluator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/literals/StringLiteralEvaluator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/literals/StringLiteralEvaluator.java new file mode 100644 index 0000000..d6a09f1 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/literals/StringLiteralEvaluator.java @@ -0,0 +1,33 @@ +package org.apache.nifi.pql.evaluation.literals; + +import org.apache.nifi.pql.evaluation.OperandEvaluator; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public class StringLiteralEvaluator implements OperandEvaluator<String> { + private final String value; + + public StringLiteralEvaluator(final String value) { + this.value = value; + } + + @Override + public String evaluate(final ProvenanceEventRecord record) { + return value; + } + + @Override + public Class<String> getType() { + return String.class; + } + + @Override + public String toString() { + return value; + } + + @Override + public int getEvaluatorType() { + return org.apache.nifi.pql.ProvenanceQueryParser.STRING_LITERAL; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/logic/AndEvaluator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/logic/AndEvaluator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/logic/AndEvaluator.java new file mode 100644 index 0000000..fb9b408 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/logic/AndEvaluator.java @@ -0,0 +1,114 @@ +package org.apache.nifi.pql.evaluation.logic; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.pql.evaluation.BooleanEvaluator; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public class AndEvaluator implements BooleanEvaluator { + + private final BooleanEvaluator lhs; + private final BooleanEvaluator rhs; + + + public AndEvaluator(final BooleanEvaluator lhs, final BooleanEvaluator rhs) { + this.lhs = lhs; + this.rhs = rhs; + } + + @Override + public Boolean evaluate(final ProvenanceEventRecord record) { + return lhs.evaluate(record) && rhs.evaluate(record); + } + + @Override + public BooleanEvaluator negate() { + return new OrEvaluator(lhs.negate(), rhs.negate()); + } + + public BooleanEvaluator getLHS() { + return lhs; + } + + public BooleanEvaluator getRHS() { + return rhs; + } + + /** + * Converts this AND tree to Disjunctive Normal Form (OR's of AND's) + * @return + */ + public BooleanEvaluator toDNF() { + final List<BooleanEvaluator> rhsEvaluators = new ArrayList<>(); + final List<BooleanEvaluator> lhsEvaluators = new ArrayList<>(); + + if ( rhs instanceof OrEvaluator ) { + final OrEvaluator or = (OrEvaluator) rhs; + rhsEvaluators.add(or.getLHS()); + rhsEvaluators.add(or.getRHS()); + } else if ( rhs instanceof AndEvaluator ) { + rhsEvaluators.add( ((AndEvaluator) rhs).toDNF() ); + } else { + rhsEvaluators.add(rhs); + } + + if ( lhs instanceof OrEvaluator ) { + final OrEvaluator or = (OrEvaluator) lhs; + lhsEvaluators.add(or.getLHS()); + lhsEvaluators.add(or.getRHS()); + } else if ( lhs instanceof AndEvaluator ) { + lhsEvaluators.add( ((AndEvaluator) lhs).toDNF() ); + } else { + lhsEvaluators.add(lhs); + } + + if ( rhsEvaluators.size() == 1 && lhsEvaluators.size() == 1 ) { + return this; + } + + final List<AndEvaluator> ands = new ArrayList<>(); + for ( final BooleanEvaluator l : lhsEvaluators ) { + for ( final BooleanEvaluator r : rhsEvaluators ) { + final AndEvaluator and = new AndEvaluator(l, r); + ands.add(and); + } + } + + final AndEvaluator and1 = ands.get(0); + final AndEvaluator and2 = ands.get(1); + OrEvaluator or = new OrEvaluator(and1, and2); + + for (int i=2; i < ands.size(); i++) { + final AndEvaluator ae = ands.get(i); + or = new OrEvaluator(or, ae); + } + + return or; + } + + public String toString() { + final StringBuilder sb = new StringBuilder(); + if ( lhs instanceof AndEvaluator || lhs instanceof OrEvaluator ) { + sb.append("(").append(lhs.toString()).append(")"); + } else { + sb.append(lhs.toString()); + } + + sb.append(" & "); + + if ( rhs instanceof AndEvaluator || rhs instanceof OrEvaluator ) { + sb.append("(").append(rhs.toString()).append(")"); + } else { + sb.append(rhs.toString()); + } + + return sb.toString(); + } + + @Override + public int getEvaluatorType() { + return org.apache.nifi.pql.ProvenanceQueryParser.AND; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/logic/OrEvaluator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/logic/OrEvaluator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/logic/OrEvaluator.java new file mode 100644 index 0000000..c93462a --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/logic/OrEvaluator.java @@ -0,0 +1,58 @@ +package org.apache.nifi.pql.evaluation.logic; + +import org.apache.nifi.pql.evaluation.BooleanEvaluator; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public class OrEvaluator implements BooleanEvaluator { + private final BooleanEvaluator lhs; + private final BooleanEvaluator rhs; + + public OrEvaluator(final BooleanEvaluator lhs, final BooleanEvaluator rhs) { + this.lhs = lhs; + this.rhs = rhs; + } + + @Override + public Boolean evaluate(final ProvenanceEventRecord record) { + return lhs.evaluate(record) || rhs.evaluate(record); + } + + @Override + public BooleanEvaluator negate() { + return new AndEvaluator(lhs.negate(), rhs.negate()); + } + + public BooleanEvaluator getLHS() { + return lhs; + } + + public BooleanEvaluator getRHS() { + return rhs; + } + + + public String toString() { + final StringBuilder sb = new StringBuilder(); + if ( lhs instanceof AndEvaluator || lhs instanceof OrEvaluator ) { + sb.append("(").append(lhs.toString()).append(")"); + } else { + sb.append(lhs.toString()); + } + + sb.append(" | "); + + if ( rhs instanceof AndEvaluator || rhs instanceof OrEvaluator ) { + sb.append("(").append(rhs.toString()).append(")"); + } else { + sb.append(rhs.toString()); + } + + return sb.toString(); + } + + @Override + public int getEvaluatorType() { + return org.apache.nifi.pql.ProvenanceQueryParser.OR; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/CellValue.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/CellValue.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/CellValue.java new file mode 100644 index 0000000..724e340 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/CellValue.java @@ -0,0 +1,69 @@ +package org.apache.nifi.pql.evaluation.order; + +import java.util.Comparator; + +public class CellValue<T> implements Comparable<CellValue<T>> { + private final T value; + private final int rowId; + private final Comparator<T> valueComparator; + + public CellValue(final T value, final int rowId, final Comparator<T> valueComparator) { + this.value = value; + this.rowId = rowId; + this.valueComparator = valueComparator; + } + + public T getValue() { + return value; + } + + public int getRowId() { + return rowId; + } + + @Override + public int compareTo(final CellValue<T> other) { + if ( other == null ) { + return 1; + } + + if ( this == other ) { + return 0; + } + + return valueComparator.compare(value, other.value); + } + + @Override + public String toString() { + return value.toString(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((value == null) ? 0 : value.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + + @SuppressWarnings("rawtypes") + CellValue other = (CellValue) obj; + if (value == null) { + if (other.value != null) + return false; + } else if (!value.equals(other.value)) + return false; + return true; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/FieldSorter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/FieldSorter.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/FieldSorter.java new file mode 100644 index 0000000..b306b53 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/FieldSorter.java @@ -0,0 +1,87 @@ +package org.apache.nifi.pql.evaluation.order; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.nifi.pql.evaluation.OperandEvaluator; +import org.apache.nifi.pql.groups.Group; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public class FieldSorter implements RowSorter { + + private final SortedSet<CellValue<ProvenanceEventRecord>> values = new TreeSet<>(); + private final MultiFieldComparator comparator; + + public FieldSorter(final Map<OperandEvaluator<?>, SortDirection> fieldEvaluators) { + comparator = new MultiFieldComparator(fieldEvaluators); + } + + + @Override + public void add(final ProvenanceEventRecord record, final Group group, final int rowId) { + values.add(new CellValue<ProvenanceEventRecord>(record, rowId, comparator)); + } + + @Override + public List<Integer> sort() { + final List<Integer> rowIds = new ArrayList<>(); + for ( final CellValue<?> value : values ) { + rowIds.add( value.getRowId() ); + } + return rowIds; + } + + + + private static class MultiFieldComparator implements Comparator<ProvenanceEventRecord> { + private final Map<OperandEvaluator<?>, SortDirection> evals; + private final Comparator<Number> numberComparator = Sorters.newNumberComparator(); + private final Comparator<Object> objectComparator = Sorters.newObjectComparator(); + + public MultiFieldComparator(final Map<OperandEvaluator<?>, SortDirection> evals) { + this.evals = evals; + } + + @Override + public int compare(final ProvenanceEventRecord r1, final ProvenanceEventRecord r2) { + if ( r1 == r2 ) { + return 0; + } + if (r1 == null && r2 == null) { + return 0; + } + if (r1 == null) { + return -1; + } + if (r2 == null) { + return 1; + } + + for ( final Map.Entry<OperandEvaluator<?>, SortDirection> entry : evals.entrySet() ) { + final OperandEvaluator<?> eval = entry.getKey(); + final SortDirection dir = entry.getValue(); + + int comparisonResult; + + final Object v1 = eval.evaluate(r1); + final Object v2 = eval.evaluate(r2); + + if ( Number.class.isAssignableFrom(eval.getType()) ) { + comparisonResult = numberComparator.compare((Number) v1, (Number) v2); + } else { + comparisonResult = objectComparator.compare(v1, v2); + } + + if ( comparisonResult != 0 ) { + return dir == SortDirection.ASC ? comparisonResult : -comparisonResult; + } + } + + return 0; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/GroupedSorter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/GroupedSorter.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/GroupedSorter.java new file mode 100644 index 0000000..b54416a --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/GroupedSorter.java @@ -0,0 +1,123 @@ +package org.apache.nifi.pql.evaluation.order; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.pql.evaluation.Accumulator; +import org.apache.nifi.pql.groups.Group; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public class GroupedSorter implements RowSorter { + private final Map<Accumulator<?>, SortDirection> accumulators; + private final Map<Group, Integer> firstGroupOccurrence = new HashMap<>(); + private final Comparator<Group> comparator; + + private final Set<CellValue<Group>> values = new HashSet<>(); + + public GroupedSorter(final Map<Accumulator<?>, SortDirection> accumulators) { + this.accumulators = accumulators; + comparator = new GroupedComparator(accumulators); + } + + @Override + public void add(final ProvenanceEventRecord record, final Group group, final int rowId) { + if ( !firstGroupOccurrence.containsKey(group) ) { + firstGroupOccurrence.put(group, firstGroupOccurrence.size()); + } + + for ( final Accumulator<?> accum : accumulators.keySet() ) { + accum.accumulate(record, group); + } + + values.add(new CellValue<Group>(group, firstGroupOccurrence.get(group), comparator)); + } + + @Override + public List<Integer> sort() { + final List<CellValue<Group>> sortedGroups = new ArrayList<>(); + for ( final CellValue<Group> value : values ) { + sortedGroups.add(value); + } + + Collections.sort(sortedGroups); + + final List<Integer> sorted = new ArrayList<>(values.size()); + + for ( final CellValue<Group> value : sortedGroups ) { + sorted.add( value.getRowId() ); + } + + return sorted; + } + + + private static class GroupedComparator implements Comparator<Group> { + private final Map<Accumulator<?>, SortDirection> map; + + public GroupedComparator(final Map<Accumulator<?>, SortDirection> map) { + this.map = map; + } + + @Override + @SuppressWarnings({ "rawtypes", "unchecked" }) + public int compare(final Group r1, final Group r2) { + if ( r1 == r2 ) { + return 0; + } + if (r1 == null && r2 == null) { + return 0; + } + if (r1 == null) { + return -1; + } + if (r2 == null) { + return 1; + } + if ( r1.equals(r2) ) { + return 0; + } + + for ( final Map.Entry<Accumulator<?>, SortDirection> entry : map.entrySet() ) { + final Accumulator<?> accumulator = entry.getKey(); + final SortDirection dir = entry.getValue(); + + final List<Object> rowValues1 = (List<Object>) accumulator.getValues(r1); + final List<Object> rowValues2 = (List<Object>) accumulator.getValues(r2); + + if ( rowValues1.size() > rowValues2.size() ) { + return -1; + } else if ( rowValues2.size() > rowValues1.size() ) { + return 1; + } + + for (int i=0; i < rowValues1.size(); i++) { + final Object v1 = rowValues1.get(i); + final Object v2 = rowValues2.get(i); + + int comparisonResult; + + if ( Number.class.isAssignableFrom(v1.getClass()) ) { + final Comparator comparator = Sorters.newNumberComparator(); + comparisonResult = comparator.compare((Number) v1, (Number) v2); + } else { + final Comparator comparator = Sorters.newObjectComparator(); + comparisonResult = comparator.compare(v1, v2); + } + + if ( comparisonResult != 0 ) { + return dir == SortDirection.ASC ? comparisonResult : -comparisonResult; + } + } + } + + return 0; + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/RowSorter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/RowSorter.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/RowSorter.java new file mode 100644 index 0000000..a44d2bf --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/RowSorter.java @@ -0,0 +1,13 @@ +package org.apache.nifi.pql.evaluation.order; + +import java.util.List; + +import org.apache.nifi.pql.groups.Group; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public interface RowSorter { + + void add(ProvenanceEventRecord record, Group group, int rowId); + + List<Integer> sort(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/SortDirection.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/SortDirection.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/SortDirection.java new file mode 100644 index 0000000..bf17bcc --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/SortDirection.java @@ -0,0 +1,6 @@ +package org.apache.nifi.pql.evaluation.order; + +public enum SortDirection { + ASC, + DESC; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/Sorters.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/Sorters.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/Sorters.java new file mode 100644 index 0000000..63cd6bc --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/order/Sorters.java @@ -0,0 +1,74 @@ +package org.apache.nifi.pql.evaluation.order; + +import java.text.Collator; +import java.util.Comparator; + +public class Sorters { + + public static Comparator<Number> newNumberComparator() { + return new NumberComparator(); + } + + public static Comparator<Object> newObjectComparator() { + return new ObjectComparator(); + } + + + private static class NumberComparator implements Comparator<Number> { + @Override + public int compare(Number o1, Number o2) { + if (o1 == o2) { + return 0; + } + + if (o1 == null && o2 == null) { + return 0; + } + + if (o1 == null) { + return -1; + } + + if (o2 == null) { + return 1; + } + + if (o1.doubleValue() < o2.doubleValue()) { + return -1; + } + + if (o1.doubleValue() > o2.doubleValue()) { + return 1; + } + + return 0; + } + + } + + private static class ObjectComparator implements Comparator<Object> { + private final Collator collator = Collator.getInstance(); + + @Override + public int compare(final Object o1, final Object o2) { + if ( o1 == o2 ) { + return 0; + } + + if (o1 == null && o2 == null) { + return 0; + } + + if (o1 == null) { + return -1; + } + + if (o2 == null) { + return 1; + } + + return collator.compare(o1.toString(), o2.toString()); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/repository/SelectAllRecords.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/repository/SelectAllRecords.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/repository/SelectAllRecords.java new file mode 100644 index 0000000..e1f8df8 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/repository/SelectAllRecords.java @@ -0,0 +1,61 @@ +package org.apache.nifi.pql.evaluation.repository; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import org.apache.nifi.pql.evaluation.RepositoryEvaluator; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.provenance.StoredProvenanceEvent; + +public class SelectAllRecords implements RepositoryEvaluator { + + public Iterator<StoredProvenanceEvent> evaluate(final ProvenanceEventRepository repository) throws IOException { + final int maxRecords = 10000; + + return new Iterator<StoredProvenanceEvent>() { + long iterated = 0; + long fetched = 0; + + List<StoredProvenanceEvent> records = null; + Iterator<StoredProvenanceEvent> listItr = null; + + private void ensureIterator() { + if ( listItr == null || !listItr.hasNext() ) { + try { + records = repository.getEvents(fetched, maxRecords); + } catch (final IOException ioe) { + throw new RuntimeException(ioe); + } + + listItr = records.iterator(); + fetched += records.size(); + } + } + + public boolean hasNext() { + ensureIterator(); + return listItr.hasNext(); + } + + public StoredProvenanceEvent next() { + if ( !hasNext() ) { + throw new NoSuchElementException(); + } + + if ( iterated++ == fetched ) { + records = null; + listItr = null; + } + + return listItr.next(); + } + + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/exception/ProvenanceQueryLanguageException.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/exception/ProvenanceQueryLanguageException.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/exception/ProvenanceQueryLanguageException.java new file mode 100644 index 0000000..87ca2c1 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/exception/ProvenanceQueryLanguageException.java @@ -0,0 +1,23 @@ +package org.apache.nifi.pql.exception; + +public class ProvenanceQueryLanguageException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + public ProvenanceQueryLanguageException() { + super(); + } + + public ProvenanceQueryLanguageException(final String message) { + super(message); + } + + public ProvenanceQueryLanguageException(final Throwable cause) { + super(cause); + } + + public ProvenanceQueryLanguageException(final String message, final Throwable cause) { + super(message, cause); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/exception/ProvenanceQueryLanguageParsingException.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/exception/ProvenanceQueryLanguageParsingException.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/exception/ProvenanceQueryLanguageParsingException.java new file mode 100644 index 0000000..88426d3 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/exception/ProvenanceQueryLanguageParsingException.java @@ -0,0 +1,23 @@ +package org.apache.nifi.pql.exception; + +public class ProvenanceQueryLanguageParsingException extends ProvenanceQueryLanguageException { + + private static final long serialVersionUID = 1L; + + public ProvenanceQueryLanguageParsingException() { + super(); + } + + public ProvenanceQueryLanguageParsingException(final String message) { + super(message); + } + + public ProvenanceQueryLanguageParsingException(final Throwable cause) { + super(cause); + } + + public ProvenanceQueryLanguageParsingException(final String message, final Throwable cause) { + super(message, cause); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/groups/Group.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/groups/Group.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/groups/Group.java new file mode 100644 index 0000000..a3e87c1 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/groups/Group.java @@ -0,0 +1,57 @@ +package org.apache.nifi.pql.groups; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class Group { + + private final List<Object> values; + private final int hashCode; + + public Group(final Object... values) { + this(Arrays.asList(values)); + } + + public Group(final List<Object> values) { + this.values = new ArrayList<>(values); + + int prime = 23497; + int hc = 1; + for ( final Object o : values ) { + hc = prime * hc + o.hashCode(); + } + + this.hashCode = hc; + } + + @Override + public int hashCode() { + return hashCode; + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + + Group other = (Group) obj; + if (hashCode != other.hashCode) + return false; + if (values == null) { + if (other.values != null) + return false; + } else if (!values.equals(other.values)) + return false; + return true; + } + + @Override + public String toString() { + return values == null ? "Default Group" : values.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/groups/Grouper.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/groups/Grouper.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/groups/Grouper.java new file mode 100644 index 0000000..3ce667b --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/groups/Grouper.java @@ -0,0 +1,20 @@ +package org.apache.nifi.pql.groups; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.pql.evaluation.RecordEvaluator; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public class Grouper { + + public static Group group(final ProvenanceEventRecord record, final List<RecordEvaluator<?>> evaluators) { + final List<Object> values = new ArrayList<>(evaluators.size()); + for ( final RecordEvaluator<?> evaluator : evaluators ) { + values.add(evaluator.evaluate(record)); + } + + return new Group(values); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/GroupingResultSet.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/GroupingResultSet.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/GroupingResultSet.java new file mode 100644 index 0000000..53c4dca --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/GroupingResultSet.java @@ -0,0 +1,161 @@ +package org.apache.nifi.pql.results; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; + +import org.apache.nifi.pql.evaluation.Accumulator; +import org.apache.nifi.pql.evaluation.RecordEvaluator; +import org.apache.nifi.pql.evaluation.order.RowSorter; +import org.apache.nifi.pql.groups.Group; +import org.apache.nifi.pql.groups.Grouper; +import org.apache.nifi.provenance.StoredProvenanceEvent; +import org.apache.nifi.provenance.query.ProvenanceResultSet; + +public class GroupingResultSet implements ProvenanceResultSet { + private final List<String> labels; + private final List<Class<?>> returnTypes; + private final Iterator<? extends StoredProvenanceEvent> recordItr; + private final List<Accumulator<?>> selectAccumulators; + private final RecordEvaluator<Boolean> sourceEvaluator; + private final RecordEvaluator<Boolean> conditionEvaluator; + private final List<RecordEvaluator<?>> groupEvaluators; + private final RowSorter sorter; + private final Long limit; + private long recordsReturned = 0L; + + private Iterator<List<Object>> rowItr; + + public GroupingResultSet( + final Iterator<? extends StoredProvenanceEvent> recordItr, + final List<Accumulator<?>> selectAccumulators, + final RecordEvaluator<Boolean> sourceEvaluator, + final RecordEvaluator<Boolean> conditionEvaluator, + final List<String> labels, final List<Class<?>> returnTypes, final List<RecordEvaluator<?>> groupEvaluators, + final RowSorter sorter, + final Long limit) { + + this.labels = labels; + this.returnTypes = returnTypes; + this.recordItr = recordItr; + this.selectAccumulators = selectAccumulators; + this.sourceEvaluator = sourceEvaluator; + this.conditionEvaluator = conditionEvaluator; + this.groupEvaluators = groupEvaluators; + this.sorter = sorter; + this.limit = limit; + } + + + @Override + public List<String> getLabels() { + return labels; + } + + @Override + public List<Class<?>> getReturnType() { + return returnTypes; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private void createRowItr() { + int recordIdx = 0; + while (recordItr.hasNext()) { + final StoredProvenanceEvent record = recordItr.next(); + + if ( sourceEvaluator != null && !sourceEvaluator.evaluate(record) ) { + continue; + } + + final boolean meetsConditions = conditionEvaluator == null ? true : conditionEvaluator.evaluate(record); + if ( meetsConditions ) { + final Group group = groupEvaluators == null ? null : Grouper.group(record, groupEvaluators); + + for ( final Accumulator<?> accumulator : selectAccumulators ) { + accumulator.accumulate(record, group); + } + + if ( sorter != null ) { + sorter.add(record, group, recordIdx++); + } + } + } + + // Key = Group + // Value = Map + // Key = Accumulator + // Value = Column values for a row + final Map<Group, Map<Accumulator, List<Object>>> groupedMap = new LinkedHashMap<>(); + for ( final Accumulator<?> accumulator : selectAccumulators ) { + final Map<Group, List<Object>> accumulatedValues = (Map) accumulator.getValues(); + + // for each row returned by this accumulator... + for ( final Map.Entry<Group, List<Object>> entry : accumulatedValues.entrySet() ) { + final Group group = entry.getKey(); + + Map<Accumulator, List<Object>> accumulatorRows = groupedMap.get(group); + if ( accumulatorRows == null ) { + accumulatorRows = new LinkedHashMap<>(); + groupedMap.put(group, accumulatorRows); + } + accumulatorRows.put(accumulator, accumulatedValues.get(group)); + } + } + + final Collection<Map<Accumulator, List<Object>>> columnCollection = groupedMap.values(); + final List<List<Object>> rows = new ArrayList<>(); + for ( final Map<Accumulator, List<Object>> map : columnCollection ) { + final List<Object> columnValues = new ArrayList<>(); + + int rowIdx = 0; + for ( final List<Object> accumulatorRows : map.values() ) { + if (accumulatorRows.size() <= rowIdx) { + break; + } + + final Object columnVal = accumulatorRows.get(rowIdx); + columnValues.add(columnVal); + } + + rowIdx++; + rows.add(columnValues); + } + + final List<List<Object>> sortedRows; + if ( sorter == null ) { + sortedRows = rows; + } else { + sortedRows = new ArrayList<>(rows.size()); + + final List<Integer> sortedRowIds = sorter.sort(); + for (final Integer rowId : sortedRowIds) { + sortedRows.add( rows.get(rowId) ); + } + } + + rowItr = sortedRows.iterator(); + } + + + @Override + public boolean hasNext() { + if ( rowItr == null ) { + createRowItr(); + } + return (limit == null || recordsReturned <= limit ) && rowItr.hasNext(); + } + + @Override + public List<?> next() { + if ( hasNext() ) { + return rowItr.next(); + } + + throw new NoSuchElementException(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/OrderedResultSet.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/OrderedResultSet.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/OrderedResultSet.java new file mode 100644 index 0000000..c5417c5 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/OrderedResultSet.java @@ -0,0 +1,50 @@ +package org.apache.nifi.pql.results; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.nifi.pql.evaluation.order.RowSorter; +import org.apache.nifi.provenance.query.ProvenanceResultSet; + +public class OrderedResultSet implements ProvenanceResultSet { + private final Iterator<List<?>> sortedRowItr; + private final ProvenanceResultSet rs; + + public OrderedResultSet(final ProvenanceResultSet rs, final RowSorter sorter) { + this.rs = rs; + final List<List<?>> rows = new ArrayList<>(); + + while (rs.hasNext()) { + final List<?> colVals = rs.next(); + rows.add(colVals); + } + + final List<List<?>> sortedRows = new ArrayList<>(rows.size()); + for ( final Integer rowId : sorter.sort() ) { + sortedRows.add(rows.get(rowId.intValue())); + } + + sortedRowItr = sortedRows.iterator(); + } + + @Override + public List<String> getLabels() { + return rs.getLabels(); + } + + @Override + public List<Class<?>> getReturnType() { + return rs.getReturnType(); + } + + @Override + public boolean hasNext() { + return sortedRowItr.hasNext(); + } + + @Override + public List<?> next() { + return sortedRowItr.next(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/ResultRow.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/ResultRow.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/ResultRow.java new file mode 100644 index 0000000..f02a992 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/ResultRow.java @@ -0,0 +1,20 @@ +package org.apache.nifi.pql.results; + +import java.util.Arrays; +import java.util.List; + +public class ResultRow { + private final List<Object> values; + + public ResultRow(final Object... values) { + this(Arrays.asList(values)); + } + + public ResultRow(final List<Object> values) { + this.values = values; + } + + public List<Object> getValues() { + return values; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/RowIterator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/RowIterator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/RowIterator.java new file mode 100644 index 0000000..a9202dc --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/RowIterator.java @@ -0,0 +1,69 @@ +package org.apache.nifi.pql.results; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class RowIterator implements Iterator<ResultRow> { + private final Iterator<Iterator<?>> itrs; + + public RowIterator(final List<Iterator<?>> itrs) { + this.itrs = itrs.iterator(); + } + + + @Override + public boolean hasNext() { + return itrs.hasNext(); + } + + public ResultRow next() { + final Iterator<?> columnValueItr = itrs.next(); + final List<Object> colValues = new ArrayList<>(); + while (columnValueItr.hasNext()) { + colValues.add(columnValueItr.next()); + } + return new ResultRow(colValues); + } + +// @Override +// public boolean hasNext() { +// if ( curItr == null || !curItr.hasNext() ) { +// while (itrs.hasNext()) { +// curItr = itrs.next(); +// if ( curItr.hasNext() ) { +// return true; +// } +// } +// +// return false; +// } +// +// return true; +// } +// +// @Override +// public T next() { +// if ( curItr.hasNext() ) { +// return curItr.next(); +// } +// +// if ( curItr == null || !curItr.hasNext() ) { +// while (itrs.hasNext()) { +// curItr = itrs.next(); +// if ( curItr.hasNext() ) { +// return curItr.next(); +// } +// } +// +// throw new NoSuchElementException(); +// } else { +// return curItr.next(); +// } +// } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/StandardOrderedResultSet.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/StandardOrderedResultSet.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/StandardOrderedResultSet.java new file mode 100644 index 0000000..4bec4c0 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/StandardOrderedResultSet.java @@ -0,0 +1,111 @@ +package org.apache.nifi.pql.results; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.nifi.pql.evaluation.Accumulator; +import org.apache.nifi.pql.evaluation.RecordEvaluator; +import org.apache.nifi.pql.evaluation.order.RowSorter; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.StoredProvenanceEvent; +import org.apache.nifi.provenance.query.ProvenanceResultSet; + +public class StandardOrderedResultSet implements ProvenanceResultSet { + private final List<String> labels; + private final List<Class<?>> returnTypes; + + private final Iterator<? extends StoredProvenanceEvent> recordItr; + private final List<Accumulator<?>> selectAccumulators; + private final RecordEvaluator<Boolean> sourceEvaluator; + private final RecordEvaluator<Boolean> conditionEvaluator; + private final RowSorter sorter; + private final Long limit; + + private Iterator<ResultRow> resultRowItr; + + public StandardOrderedResultSet(final Iterator<? extends StoredProvenanceEvent> recordItr, + final List<Accumulator<?>> selectAccumulators, + final RecordEvaluator<Boolean> sourceEvaluator, + final RecordEvaluator<Boolean> conditionEvaluator, + final List<String> labels, + final List<Class<?>> returnTypes, + final RowSorter sorter, + final Long limit) + { + this.labels = labels; + this.returnTypes = returnTypes; + + this.recordItr = recordItr; + this.selectAccumulators = selectAccumulators; + this.sourceEvaluator = sourceEvaluator; + this.conditionEvaluator = conditionEvaluator; + this.sorter = sorter; + this.limit = limit; + } + + + @Override + public List<String> getLabels() { + return labels; + } + + @Override + public List<Class<?>> getReturnType() { + return returnTypes; + } + + private void createResultRowItr() { + final List<ResultRow> rows = new ArrayList<>(); + int idx = 0; + while (recordItr.hasNext()) { + final ProvenanceEventRecord record = recordItr.next(); + if ( sourceEvaluator != null && !sourceEvaluator.evaluate(record) ) { + continue; + } + + final boolean meetsConditions = conditionEvaluator == null ? true : conditionEvaluator.evaluate(record); + if ( meetsConditions ) { + final List<Object> values = new ArrayList<>(selectAccumulators.size()); + for ( final Accumulator<?> accumulator : selectAccumulators ) { + final Object value = accumulator.accumulate(record, null); + accumulator.reset(); + values.add(value); + } + rows.add(new ResultRow(values)); + sorter.add(record, null, idx++); + } + } + + final List<ResultRow> sortedRows = new ArrayList<>(); + for ( final Integer unsortedIndex : sorter.sort() ) { + final ResultRow row = rows.get(unsortedIndex.intValue()); + sortedRows.add(row); + + if ( limit != null && sortedRows.size() >= limit.intValue() ) { + break; + } + } + + resultRowItr = sortedRows.iterator(); + } + + @Override + public boolean hasNext() { + if ( resultRowItr == null ) { + createResultRowItr(); + } + + return resultRowItr.hasNext(); + } + + @Override + public List<?> next() { + if ( resultRowItr == null ) { + createResultRowItr(); + } + + return resultRowItr.next().getValues(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/StandardUnorderedResultSet.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/StandardUnorderedResultSet.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/StandardUnorderedResultSet.java new file mode 100644 index 0000000..019ab5c --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/results/StandardUnorderedResultSet.java @@ -0,0 +1,104 @@ +package org.apache.nifi.pql.results; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import org.apache.nifi.pql.evaluation.Accumulator; +import org.apache.nifi.pql.evaluation.RecordEvaluator; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.StoredProvenanceEvent; +import org.apache.nifi.provenance.query.ProvenanceResultSet; + +public class StandardUnorderedResultSet implements ProvenanceResultSet { + + private final List<String> labels; + private final List<Class<?>> returnTypes; + private final Iterator<? extends StoredProvenanceEvent> recordItr; + private final RecordEvaluator<Boolean> sourceEvaluator; + private final RecordEvaluator<Boolean> conditionEvaluator; + private final List<Accumulator<?>> selectAccumulators; + private final Long limit; + + private ResultRow nextRecord; + private long recordsReturned = 0L; + + public StandardUnorderedResultSet(final Iterator<? extends StoredProvenanceEvent> recordItr, + final List<Accumulator<?>> selectAccumulators, + final RecordEvaluator<Boolean> sourceEvaluator, + final RecordEvaluator<Boolean> conditionEvaluator, + final List<String> labels, + final List<Class<?>> returnTypes, + final Long limit) + { + this.selectAccumulators = selectAccumulators; + this.labels = labels; + this.returnTypes = returnTypes; + this.recordItr = recordItr; + this.sourceEvaluator = sourceEvaluator; + this.conditionEvaluator = conditionEvaluator; + this.limit = limit; + } + + + @Override + public List<String> getLabels() { + return labels; + } + + @Override + public List<Class<?>> getReturnType() { + return returnTypes; + } + + private boolean findNextRecord() { + if ( limit != null && recordsReturned >= limit.longValue() ) { + return false; + } + + while (recordItr.hasNext()) { + final ProvenanceEventRecord record = recordItr.next(); + + if ( sourceEvaluator != null && !sourceEvaluator.evaluate(record) ) { + continue; + } + + final boolean meetsConditions = conditionEvaluator == null ? true : conditionEvaluator.evaluate(record); + if ( meetsConditions ) { + final List<Object> values = new ArrayList<>(selectAccumulators.size()); + for ( final Accumulator<?> accumulator : selectAccumulators ) { + final Object value = accumulator.accumulate(record, null); + accumulator.reset(); + values.add(value); + } + this.nextRecord = new ResultRow(values); + recordsReturned++; + return true; + } + } + + return false; + } + + @Override + public boolean hasNext() { + if ( nextRecord != null ) { + return true; + } + + return findNextRecord(); + } + + @Override + public List<?> next() { + if ( hasNext() ) { + final List<?> value = nextRecord.getValues(); + nextRecord = null; + return value; + } + + throw new NoSuchElementException(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/resources/docs/examples.pql ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/resources/docs/examples.pql b/nifi/nifi-commons/nifi-provenance-query-language/src/main/resources/docs/examples.pql new file mode 100644 index 0000000..1f0b12c --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/resources/docs/examples.pql @@ -0,0 +1,48 @@ +# Select all events that occurred in the last hour +SELECT Event +FROM * +WHERE + Event.Time > 1 HOUR AGO + + +# Count the number of RECEIVE events from myHost.myDomain in the last hour +SELECT COUNT(*) +FROM RECEIVE +WHERE + Event.Time WITHIN 1 HOUR + AND + Event.TransitUri MATCHES '.*myHost\.myDomain.*' + + +# Count the number of bytes received from myHost.myDomain in the last hour +SELECT SUM(FileSize) +FROM RECEIVE +WHERE + Event.Time WITHIN 1 HOUR + AND + Event.TransitUri MATCHES '.*myHost\.MyDomain.*' + + +# Count the number of objects and number of bytes sent to each host on August 1, 2014, but only for the 10 hosts that we send to the most +SELECT TransitUri, COUNT(*) as numFiles, SUM(FileSize) as fileSize +FROM SEND +WHERE + Event.Time BETWEEN '08/01/2014 00:00:00.000' AND '08/01/2014 23:59:59.999' +GROUP BY + Event.TransitUri +ORDER BY + fileSize +LIMIT 10 + + + + +# Select the filename and transit URI of any SEND event that occurred as the result of receiving a filename that ends with '.txt' in the last hour: +DEFINE LINEAGE L: + RECEIVE R{ + R['filename'] MATCHES '.*\.txt' + AND + R.Time > 1 HOUR AGO + } --> SEND S +SELECT S.Time, S['filename'], S.TransitUri +FROM LINEAGE L http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/resources/docs/implementation-notes.txt ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/resources/docs/implementation-notes.txt b/nifi/nifi-commons/nifi-provenance-query-language/src/main/resources/docs/implementation-notes.txt new file mode 100644 index 0000000..e75786f --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/resources/docs/implementation-notes.txt @@ -0,0 +1,52 @@ +ORDER BY + - Very Difficult. + - Does not have anything to do with what is selected + - Need to do an External Sort: + - Break records into groups of 10,000. + - Sort each group of 10,000 records. + - Write out those 10,000 records. + - Read in the first X number of records where X = 10,000 / number of files + - Merge the heads of the records. + - Algorithm is on Wikipedia. "External Sort" + - Similar to what we do in WALI when we order the Transactions by Transaction ID across multiple partitions. + + +GROUP BY + - Implement in the Accumulator. May make sense to break Accumulator into two Interfaces: + + GroupingAccumulator: + T accumulate(ProvenanceEventRecord record, Group group) + UngroupedAccumulator: + T accumulate(ProvenanceEventRecord record) + + Then the GroupingAccumulator will simply map a group to the appropriate UngroupedAccumulator and then call #accumulate. + UngroupedAccumulator will never be used except for the GroupingAccumulator delegating to the appropriate UngroupedAccumulator. + + +WHERE + - All functions must be able to be done in Lucene. + +EventAccumulator + - Should store provenance event location instead of event. Regardless of whether a field was selected or the entire event. + + + + + + + + +Prov Repo: + - Allow ANDs and ORs in queries? + - ProvenanceEventRecord should return Location object. Location is a marker interface and the specific implementation will + to be used will depend on the repo. For example, VolatileProvenanceRepository would return something like "int getIndex()" and "long getId()" + so that we can get the event at the specified index and return null unless that event's id is equal to the result of calling 'getId()'. + Persistent Prov Repo would return a Location that includes filename & offset. Perhaps also a record index so that we can add multiple + records to a single repo update (byte offset of 'transaction' is 1000 and record offset into transaction is 4). This would be used + so that if we do an update with 100 records and all have similar fields (component type, component id, most previous attributes?), then we + can write that out once. This should probably be a new data structure that wraps a ProvenanceEventRecord: + StoredProvenanceEvent + ProvenanceEventRecord getEventRecord() + Location getLocation() + - Index all attributes and properties always? At least allow property value to be "*" to indicate all. + \ No newline at end of file