NIFI-40: initial implementation of prov query language
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/df43c8b6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/df43c8b6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/df43c8b6 Branch: refs/heads/prov-query-language Commit: df43c8b62086a5ca970b6fd377eaddc835d9e3cb Parents: b512ff1 Author: Mark Payne <marka...@hotmail.com> Authored: Thu Mar 12 09:14:30 2015 -0400 Committer: Mark Payne <marka...@hotmail.com> Committed: Thu Mar 12 09:14:30 2015 -0400 ---------------------------------------------------------------------- .../AbstractConfigurableComponent.java | 15 ++- .../AbstractSessionFactoryProcessor.java | 2 + .../provenance/ProvenanceEventRepository.java | 29 ++++- .../nifi-provenance-query-language/pom.xml | 5 +- .../org/apache/nifi/pql/ProvenanceQueryParser.g | 2 +- .../org/apache/nifi/pql/ProvenanceQuery.java | 75 ++++++++++--- .../conversion/StringToLongEvaluator.java | 68 ++++++++++++ .../extraction/ComponentTypeEvaluator.java | 39 +++++++ .../evaluation/function/TimeFieldEvaluator.java | 4 + .../java/org/apache/nifi/pql/groups/Group.java | 2 +- .../java/org/apache/nifi/pql/TestQuery.java | 12 +- nifi/nifi-commons/pom.xml | 1 + .../JournalingProvenanceRepository.java | 3 +- .../journaling/index/LuceneIndexSearcher.java | 2 +- .../TestJournalingProvenanceRepository.java | 72 +++++++++++- .../PersistentProvenanceRepository.java | 11 ++ .../nifi-volatile-provenance-repository/pom.xml | 7 ++ .../VolatileProvenanceRepository.java | 110 ++++++++++++++++++- .../nifi-standard-reporting-tasks/pom.xml | 5 + .../provenance/GenerateProvenanceReport.java | 83 ++++++++++++++ 20 files changed, 515 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-api/src/main/java/org/apache/nifi/components/AbstractConfigurableComponent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/components/AbstractConfigurableComponent.java b/nifi/nifi-api/src/main/java/org/apache/nifi/components/AbstractConfigurableComponent.java index f4bea5e..e093785 100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/components/AbstractConfigurableComponent.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/components/AbstractConfigurableComponent.java @@ -22,8 +22,17 @@ import java.util.Collections; import java.util.List; import java.util.Map; -public abstract class AbstractConfigurableComponent implements ConfigurableComponent { +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessorInitializationContext; +public abstract class AbstractConfigurableComponent implements ConfigurableComponent { + private ComponentLogger loger; + + @Override + public void initialize(final ProcessorInitializationContext context) { + logger = context.getLogger(); + } + /** * Allows subclasses to perform their own validation on the already set * properties. Since each property is validated as it is set this allows @@ -45,6 +54,10 @@ public abstract class AbstractConfigurableComponent implements ConfigurableCompo return Collections.emptySet(); } + protected ComponentLog getLogger() { + return logger; + } + /** * Returns a PropertyDescriptor for the name specified that is fully * populated http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-api/src/main/java/org/apache/nifi/processor/AbstractSessionFactoryProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/processor/AbstractSessionFactoryProcessor.java b/nifi/nifi-api/src/main/java/org/apache/nifi/processor/AbstractSessionFactoryProcessor.java index f13a143..57dee98 100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/processor/AbstractSessionFactoryProcessor.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/processor/AbstractSessionFactoryProcessor.java @@ -53,6 +53,8 @@ public abstract class AbstractSessionFactoryProcessor extends AbstractConfigurab @Override public final void initialize(final ProcessorInitializationContext context) { + super.initialize(context); + identifier = context.getIdentifier(); logger = context.getLogger(); serviceLookup = context.getControllerServiceLookup(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRepository.java b/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRepository.java index 18e5788..b969038 100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRepository.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRepository.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.nifi.events.EventReporter; import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; +import org.apache.nifi.provenance.query.ProvenanceQuerySubmission; import org.apache.nifi.provenance.search.Query; import org.apache.nifi.provenance.search.QuerySubmission; import org.apache.nifi.provenance.search.SearchableField; @@ -103,6 +104,26 @@ public interface ProvenanceEventRepository extends Closeable { */ Long getMaxEventId() throws IOException; + + /** + * Submits an asynchronous request to process the given Provenance Query Language query, + * returning an identifier that can be used to fetch the results at a later time + * + * @param query + * @return + */ + ProvenanceQuerySubmission submitQuery(String query); + + /** + * Returns the ProvenanceQuerySubmission associated with the given identifier, or <code>null</code> + * if no query exists with the given identifier. + * + * @param queryIdentifier + * + * @return + */ + ProvenanceQuerySubmission retrieveProvenanceQuerySubmission(String queryIdentifier); + /** * Submits an asynchronous request to process the given query, returning an * identifier that can be used to fetch the results at a later time @@ -111,11 +132,11 @@ public interface ProvenanceEventRepository extends Closeable { * @return */ QuerySubmission submitQuery(Query query); - + + /** - * Returns the QueryResult associated with the given identifier, if the - * query has finished processing. If the query has not yet finished running, - * returns <code>null</code>. + * Returns the QuerySubmission associated with the given identifier, or <code>null</code> + * if no query exists with the provided identifier * * @param queryIdentifier * http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-commons/nifi-provenance-query-language/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/pom.xml b/nifi/nifi-commons/nifi-provenance-query-language/pom.xml index a13fd16..905ed5b 100644 --- a/nifi/nifi-commons/nifi-provenance-query-language/pom.xml +++ b/nifi/nifi-commons/nifi-provenance-query-language/pom.xml @@ -16,7 +16,6 @@ <plugin> <groupId>org.antlr</groupId> <artifactId>antlr3-maven-plugin</artifactId> - <version>3.4</version> <executions> <execution> <goals> @@ -33,6 +32,10 @@ <groupId>org.apache.nifi</groupId> <artifactId>nifi-api</artifactId> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-data-provenance-utils</artifactId> + </dependency> <dependency> <groupId>org.antlr</groupId> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-commons/nifi-provenance-query-language/src/main/antlr3/org/apache/nifi/pql/ProvenanceQueryParser.g ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/antlr3/org/apache/nifi/pql/ProvenanceQueryParser.g b/nifi/nifi-commons/nifi-provenance-query-language/src/main/antlr3/org/apache/nifi/pql/ProvenanceQueryParser.g index 3837729..25410bb 100644 --- a/nifi/nifi-commons/nifi-provenance-query-language/src/main/antlr3/org/apache/nifi/pql/ProvenanceQueryParser.g +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/antlr3/org/apache/nifi/pql/ProvenanceQueryParser.g @@ -82,7 +82,7 @@ selectableSource : EVENT | IDENTIFIER; eventProperty : propertyName -> ^(EVENT_PROPERTY propertyName ); -propertyName : UUID | TRANSIT_URI | TIMESTAMP | FILESIZE | TYPE | COMPONENT_ID | RELATIONSHIP; +propertyName : UUID | TRANSIT_URI | TIMESTAMP | FILESIZE | TYPE | COMPONENT_ID | COMPONENT_TYPE | RELATIONSHIP; attribute : STRING_LITERAL -> ^(ATTRIBUTE STRING_LITERAL); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/ProvenanceQuery.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/ProvenanceQuery.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/ProvenanceQuery.java index 200a8ee..40ccbf5 100644 --- a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/ProvenanceQuery.java +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/ProvenanceQuery.java @@ -20,6 +20,7 @@ import static org.apache.nifi.pql.ProvenanceQueryParser.LIMIT; import static org.apache.nifi.pql.ProvenanceQueryParser.LT; import static org.apache.nifi.pql.ProvenanceQueryParser.MATCHES; import static org.apache.nifi.pql.ProvenanceQueryParser.MINUTE; +import static org.apache.nifi.pql.ProvenanceQueryParser.MONTH; import static org.apache.nifi.pql.ProvenanceQueryParser.NOT; import static org.apache.nifi.pql.ProvenanceQueryParser.NOT_EQUALS; import static org.apache.nifi.pql.ProvenanceQueryParser.NUMBER; @@ -40,6 +41,8 @@ import static org.apache.nifi.pql.ProvenanceQueryParser.YEAR; import java.io.IOException; import java.util.ArrayList; import java.util.Calendar; +import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; @@ -66,6 +69,7 @@ import org.apache.nifi.pql.evaluation.comparison.LessThanEvaluator; import org.apache.nifi.pql.evaluation.comparison.MatchesEvaluator; import org.apache.nifi.pql.evaluation.comparison.RecordTypeEvaluator; import org.apache.nifi.pql.evaluation.comparison.StartsWithEvaluator; +import org.apache.nifi.pql.evaluation.conversion.StringToLongEvaluator; import org.apache.nifi.pql.evaluation.extraction.AttributeEvaluator; import org.apache.nifi.pql.evaluation.extraction.ComponentIdEvaluator; import org.apache.nifi.pql.evaluation.extraction.RelationshipEvaluator; @@ -84,14 +88,17 @@ import org.apache.nifi.pql.evaluation.order.GroupedSorter; import org.apache.nifi.pql.evaluation.order.RowSorter; import org.apache.nifi.pql.evaluation.order.SortDirection; import org.apache.nifi.pql.evaluation.repository.SelectAllRecords; +import org.apache.nifi.pql.exception.ProvenanceQueryLanguageException; import org.apache.nifi.pql.exception.ProvenanceQueryLanguageParsingException; import org.apache.nifi.pql.results.GroupingResultSet; import org.apache.nifi.pql.results.StandardOrderedResultSet; import org.apache.nifi.pql.results.StandardUnorderedResultSet; import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.SearchableFields; import org.apache.nifi.provenance.StoredProvenanceEvent; import org.apache.nifi.provenance.query.ProvenanceResultSet; +import org.apache.nifi.provenance.search.SearchableField; public class ProvenanceQuery { private final Tree tree; @@ -103,16 +110,19 @@ public class ProvenanceQuery { private final RowSorter sorter; private final Long limit; + private final Set<SearchableField> searchableFields; + private final Set<String> searchableAttributes; private long accumulatorIdGenerator = 0L; - public static ProvenanceQuery compile(final String pql) { + + public static ProvenanceQuery compile(final String pql, final Collection<SearchableField> searchableFields, final Collection<SearchableField> searchableAttributes) { try { final CommonTokenStream lexerTokenStream = createTokenStream(pql); final ProvenanceQueryParser parser = new ProvenanceQueryParser(lexerTokenStream); final Tree ast = (Tree) parser.pql().getTree(); final Tree tree = ast.getChild(0); - return new ProvenanceQuery(tree, pql); + return new ProvenanceQuery(tree, pql, searchableFields, searchableAttributes); } catch (final ProvenanceQueryLanguageParsingException e) { throw e; } catch (final Exception e) { @@ -126,9 +136,19 @@ public class ProvenanceQuery { return new CommonTokenStream(lexer); } - private ProvenanceQuery(final Tree tree, final String pql) { + private ProvenanceQuery(final Tree tree, final String pql, final Collection<SearchableField> searchableFields, final Collection<SearchableField> searchableAttributes) { this.tree = tree; this.pql = pql; + this.searchableFields = searchableFields == null ? null : Collections.unmodifiableSet(new HashSet<>(searchableFields)); + if (searchableAttributes == null) { + this.searchableAttributes = null; + } else { + final Set<String> attributes = new HashSet<>(); + for ( final SearchableField attr : searchableAttributes ) { + attributes.add(attr.getSearchableFieldName()); + } + this.searchableAttributes = Collections.unmodifiableSet(attributes); + } Tree fromTree = null; Tree whereTree = null; @@ -162,7 +182,6 @@ public class ProvenanceQuery { sourceEvaluator = (fromTree == null) ? null : buildSourceEvaluator(fromTree); - // Distribute AND's over OR's. final BooleanEvaluator where = (whereTree == null) ? null : buildConditionEvaluator(whereTree.getChild(0)); conditionEvaluator = where; @@ -192,10 +211,6 @@ public class ProvenanceQuery { selectAccumulators = accumulators; } } - - - //repoEvaluator = new SelectAllRecords(); - //repoEvaluator = new SelectFromLucene(LuceneTranslator.toLuceneQuery(where)); } @Override @@ -298,35 +313,59 @@ public class ProvenanceQuery { case EVENT_PROPERTY: switch (tree.getChild(0).getType()) { case FILESIZE: + if ( searchableFields != null && !searchableFields.contains(SearchableFields.FileSize) ) { + throw new ProvenanceQueryLanguageException("Query cannot reference FileSize because this field is not searchable by the repository"); + } return new SizeEvaluator(); case TRANSIT_URI: + if ( searchableFields != null && !searchableFields.contains(SearchableFields.TransitURI) ) { + throw new ProvenanceQueryLanguageException("Query cannot reference TransitURI because this field is not searchable by the repository"); + } return new TransitUriEvaluator(); case TIMESTAMP: + // time is always indexed return new TimestampEvaluator(); case TYPE: + // type is always indexed so no need to check it return new TypeEvaluator(); case COMPONENT_ID: + if ( searchableFields != null && !searchableFields.contains(SearchableFields.ComponentID) ) { + throw new ProvenanceQueryLanguageException("Query cannot reference Component ID because this field is not searchable by the repository"); + } return new ComponentIdEvaluator(); + // TODO: Allow Component Type to be indexed and searched case RELATIONSHIP: + if ( searchableFields != null && !searchableFields.contains(SearchableFields.Relationship) ) { + throw new ProvenanceQueryLanguageException("Query cannot reference Relationship because this field is not searchable by the repository"); + } return new RelationshipEvaluator(); case UUID: + if ( searchableFields != null && !searchableFields.contains(SearchableFields.FlowFileUUID) ) { + throw new ProvenanceQueryLanguageException("Query cannot reference FlowFile UUID because this field is not searchable by the repository"); + } return new UuidEvaluator(); default: // TODO: IMPLEMENT throw new UnsupportedOperationException("Haven't implemented extraction of property " + tree.getChild(0).getText()); } case ATTRIBUTE: + final String attributeName = tree.getChild(0).getText(); + if ( searchableAttributes != null && !searchableAttributes.contains(attributeName) ) { + throw new ProvenanceQueryLanguageException("Query cannot attribute '" + attributeName + "' because this attribute is not searchable by the repository"); + } return new AttributeEvaluator(toStringEvaluator(buildOperandEvaluator(tree.getChild(0)), tree)); case STRING_LITERAL: return new StringLiteralEvaluator(tree.getText()); case NUMBER: return new LongLiteralEvaluator(Long.valueOf(tree.getText())); + case YEAR: + return new TimeFieldEvaluator(toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), tree), Calendar.YEAR, YEAR); + case MONTH: + return new TimeFieldEvaluator(toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), tree), Calendar.MONTH, MONTH); + case DAY: + return new TimeFieldEvaluator(toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), tree), Calendar.DAY_OF_YEAR, DAY); case HOUR: return new TimeFieldEvaluator(toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), tree), Calendar.HOUR_OF_DAY, HOUR); - case DAY: - return new TimeFieldEvaluator(toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), tree), Calendar.DAY_OF_YEAR, DAY); - case YEAR: - return new TimeFieldEvaluator(toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), tree), Calendar.YEAR, YEAR); case MINUTE: return new TimeFieldEvaluator(toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), tree), Calendar.MINUTE, MINUTE); case SECOND: @@ -412,6 +451,16 @@ public class ProvenanceQuery { } private OperandEvaluator<Long> toLongEvaluator(final OperandEvaluator<?> eval, final Tree tree) { + if ( eval.getType() == Long.class ) { + @SuppressWarnings("unchecked") + final OperandEvaluator<Long> retEvaluator = ((OperandEvaluator<Long>) eval); + return retEvaluator; + } else if ( eval.getType() == String.class ) { + @SuppressWarnings("unchecked") + final OperandEvaluator<String> stringEval = ((OperandEvaluator<String>) eval); + return new StringToLongEvaluator(stringEval); + } + return castEvaluator(eval, tree, Long.class); } @@ -499,7 +548,7 @@ public class ProvenanceQuery { public static ProvenanceResultSet execute(final String query, final ProvenanceEventRepository repo) throws IOException { - return ProvenanceQuery.compile(query).execute(repo); + return ProvenanceQuery.compile(query, null, null).execute(repo); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/conversion/StringToLongEvaluator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/conversion/StringToLongEvaluator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/conversion/StringToLongEvaluator.java new file mode 100644 index 0000000..38ac9f0 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/conversion/StringToLongEvaluator.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pql.evaluation.conversion; + +import org.apache.nifi.pql.evaluation.OperandEvaluator; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public class StringToLongEvaluator implements OperandEvaluator<Long> { + private final OperandEvaluator<String> stringEvaluator; + + public StringToLongEvaluator(final OperandEvaluator<String> stringEvaluator) { + this.stringEvaluator = stringEvaluator; + } + + @Override + public Long evaluate(final ProvenanceEventRecord record) { + final String result = stringEvaluator.evaluate(record); + if (result == null) { + return null; + } + + final String trimmed = result.trim(); + if ( trimmed.isEmpty() ) { + return 0L; + } + + if ( isNumber(trimmed) ) { + return Long.parseLong(trimmed); + } + + return null; + } + + private boolean isNumber(final String value) { + for (int i=0; i < value.length(); i++) { + final char c = value.charAt(i); + if ( c < '0' || c > '9' ) { + return false; + } + } + + return true; + } + + @Override + public int getEvaluatorType() { + return -1; + } + + @Override + public Class<Long> getType() { + return Long.class; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/ComponentTypeEvaluator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/ComponentTypeEvaluator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/ComponentTypeEvaluator.java new file mode 100644 index 0000000..5455055 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/extraction/ComponentTypeEvaluator.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pql.evaluation.extraction; + +import org.apache.nifi.pql.evaluation.OperandEvaluator; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public class ComponentTypeEvaluator implements OperandEvaluator<String> { + + @Override + public String evaluate(final ProvenanceEventRecord record) { + return record.getComponentType(); + } + + @Override + public int getEvaluatorType() { + return org.apache.nifi.pql.ProvenanceQueryParser.COMPONENT_TYPE; + } + + @Override + public Class<String> getType() { + return String.class; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/function/TimeFieldEvaluator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/function/TimeFieldEvaluator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/function/TimeFieldEvaluator.java index 44450c9..373fc72 100644 --- a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/function/TimeFieldEvaluator.java +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/function/TimeFieldEvaluator.java @@ -18,9 +18,13 @@ public class TimeFieldEvaluator implements OperandEvaluator<Long> { this.timeExtractor = timeExtractor; this.evaluatorType = evaluatorType; + // note the case statements below are designed to "bleed through." + // I.e., if time field is YEAR, we want to clear all of the fields starting with month. switch (timeField) { case Calendar.YEAR: fieldsToClear.add(Calendar.MONTH); + case Calendar.MONTH: + fieldsToClear.add(Calendar.DAY_OF_MONTH); case Calendar.DAY_OF_MONTH: fieldsToClear.add(Calendar.HOUR); case Calendar.HOUR: http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/groups/Group.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/groups/Group.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/groups/Group.java index a3e87c1..35ee93b 100644 --- a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/groups/Group.java +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/groups/Group.java @@ -19,7 +19,7 @@ public class Group { int prime = 23497; int hc = 1; for ( final Object o : values ) { - hc = prime * hc + o.hashCode(); + hc = prime * hc + (o == null ? 0 : o.hashCode()); } this.hashCode = hc; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-commons/nifi-provenance-query-language/src/test/java/org/apache/nifi/pql/TestQuery.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/test/java/org/apache/nifi/pql/TestQuery.java b/nifi/nifi-commons/nifi-provenance-query-language/src/test/java/org/apache/nifi/pql/TestQuery.java index b6012db..3317c21 100644 --- a/nifi/nifi-commons/nifi-provenance-query-language/src/test/java/org/apache/nifi/pql/TestQuery.java +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/test/java/org/apache/nifi/pql/TestQuery.java @@ -92,13 +92,13 @@ public class TestQuery { @Test public void testCompilationManually() { - System.out.println(ProvenanceQuery.compile("SELECT R.TransitUri FROM *")); - System.out.println(ProvenanceQuery.compile("SELECT R['filename'] FROM RECEIVE, SEND;")); - System.out.println(ProvenanceQuery.compile("SELECT Event FROM RECEIVE ORDER BY Event['filename'];")); + System.out.println(ProvenanceQuery.compile("SELECT R.TransitUri FROM *", null, null)); + System.out.println(ProvenanceQuery.compile("SELECT R['filename'] FROM RECEIVE, SEND;", null, null)); + System.out.println(ProvenanceQuery.compile("SELECT Event FROM RECEIVE ORDER BY Event['filename'];", null, null)); // System.out.println(Query.compile("SELECT Event FROM RECEIVE WHERE ((Event.TransitUri <> 'http') OR (Event['filename'] = '1.txt')) and (Event.Size > 1000 or Event.Size between 1 AND 4);")); - System.out.println(ProvenanceQuery.compile("SELECT SUM(Event.size) FROM RECEIVE")); + System.out.println(ProvenanceQuery.compile("SELECT SUM(Event.size) FROM RECEIVE", null, null)); } @@ -107,7 +107,7 @@ public class TestQuery { createRecords(); dump(ProvenanceQuery.execute("SELECT Event", repo)); - final ProvenanceQuery query = ProvenanceQuery.compile("SELECT SUM(Event.Size), AVG(Event.Size) FROM RECEIVE WHERE Event.TransitUri = 'https://localhost:80/nifi'"); + final ProvenanceQuery query = ProvenanceQuery.compile("SELECT SUM(Event.Size), AVG(Event.Size) FROM RECEIVE WHERE Event.TransitUri = 'https://localhost:80/nifi'", null, null); final ProvenanceResultSet rs = query.execute(repo); dump(rs); @@ -302,7 +302,7 @@ public class TestQuery { + ")"; System.out.println(queryString); - final ProvenanceQuery query = ProvenanceQuery.compile(queryString); + final ProvenanceQuery query = ProvenanceQuery.compile(queryString, null, null); System.out.println(query.getWhereClause()); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-commons/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/pom.xml b/nifi/nifi-commons/pom.xml index ec0bb62..35d64fe 100644 --- a/nifi/nifi-commons/pom.xml +++ b/nifi/nifi-commons/pom.xml @@ -35,5 +35,6 @@ <module>nifi-web-utils</module> <module>nifi-processor-utilities</module> <module>nifi-write-ahead-log</module> + <module>nifi-provenance-query-language</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java index 2643787..fc197e8 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java @@ -518,6 +518,7 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository return queryManager.retrieveProvenanceQuerySubmission(queryIdentifier); } + @Override public ProvenanceQuerySubmission submitQuery(final String query) { ProvenanceQuerySubmission submission; final AtomicLong lastTimeProgressMade = new AtomicLong(System.nanoTime()); @@ -525,7 +526,7 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository try { final ProgressAwareIterator<? extends StoredProvenanceEvent> eventItr = selectMatchingEvents(query, lastTimeProgressMade); - final ProvenanceResultSet rs = ProvenanceQuery.compile(query).evaluate(eventItr); + final ProvenanceResultSet rs = ProvenanceQuery.compile(query, getSearchableFields(), getSearchableAttributes()).evaluate(eventItr); submission = new JournalingRepoQuerySubmission(query, new ProvenanceQueryResult() { @Override http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java index cd57991..3fb641e 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java @@ -197,7 +197,7 @@ public class LuceneIndexSearcher implements EventIndexSearcher { private <T> Iterator<T> select(final String query, final DocumentTransformer<T> transformer) throws IOException { - final org.apache.lucene.search.Query luceneQuery = LuceneTranslator.toLuceneQuery(ProvenanceQuery.compile(query).getWhereClause()); + final org.apache.lucene.search.Query luceneQuery = LuceneTranslator.toLuceneQuery(ProvenanceQuery.compile(query, repo.getSearchableFields(), repo.getSearchableAttributes()).getWhereClause()); final int batchSize = 1000; final ObjectHolder<TopDocs> topDocsHolder = new ObjectHolder<>(null); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java index e51f5b7..4ddfba5 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java @@ -352,7 +352,8 @@ public class TestJournalingProvenanceRepository { config.setPartitionCount(1); config.setSearchableFields(Arrays.asList(new SearchableField[] { - SearchableFields.FlowFileUUID + SearchableFields.FlowFileUUID, + SearchableFields.EventTime })); try (final JournalingProvenanceRepository repo = new JournalingProvenanceRepository(config)) { @@ -596,7 +597,8 @@ public class TestJournalingProvenanceRepository { config.setPartitionCount(3); config.setSearchableFields(Arrays.asList(new SearchableField[] { - SearchableFields.FlowFileUUID + SearchableFields.FlowFileUUID, + SearchableFields.FileSize })); try (final JournalingProvenanceRepository repo = new JournalingProvenanceRepository(config)) { @@ -640,6 +642,72 @@ public class TestJournalingProvenanceRepository { } + + @Test() + public void testAggregateQuery2AgainstMany() throws IOException, InterruptedException { + final JournalingRepositoryConfig config = new JournalingRepositoryConfig(); + config.setEventExpiration(10, TimeUnit.MINUTES); + config.setJournalRolloverPeriod(10, TimeUnit.MINUTES); + config.setJournalCapacity(1024 * 1024 * 1024); + + final Map<String, File> containers = new HashMap<>(); + containers.put("container1", new File("target/" + UUID.randomUUID().toString())); + config.setContainers(containers); + + config.setPartitionCount(1); + config.setSearchableFields(Arrays.asList(new SearchableField[] { + SearchableFields.FlowFileUUID + })); + config.setSearchableAttributes(Arrays.asList(new SearchableField[] { + SearchableFields.newSearchableAttribute("i"), + SearchableFields.newSearchableAttribute("j") + })); + + try (final JournalingProvenanceRepository repo = new JournalingProvenanceRepository(config)) { + repo.initialize(null); + + final Map<String, String> attributes = new HashMap<>(); + + final long start = System.nanoTime(); + final List<ProvenanceEventRecord> events = new ArrayList<>(1000); + for (int j=0; j < 100; j++) { + attributes.put("j", String.valueOf(j)); + for (int i=0; i < 100; i++) { + attributes.put("i", String.valueOf(i)); + final ProvenanceEventRecord event = TestUtil.generateEvent(i, attributes); + events.add(event); + if ( events.size() % 100 == 0 ) { + repo.registerEvents(events); + events.clear(); + } + } + } + final long registerFinish = System.nanoTime(); + + final ProvenanceResultSet rs = repo.query("SELECT Event['j'], COUNT(Event['i']), SUM(Event['i']) GROUP BY Event['j']"); + System.out.println(rs.getLabels()); + while ( rs.hasNext() ) { + final List<?> cols = rs.next(); + System.out.println(cols); + } + + final long searchFinish = System.nanoTime(); + System.out.println("Register records: " + TimeUnit.NANOSECONDS.toMillis(registerFinish - start) + " millis"); + System.out.println("Query records: " + TimeUnit.NANOSECONDS.toMillis(searchFinish - registerFinish) + " millis"); + + } catch (final Exception e) { + e.printStackTrace(); + Assert.fail(e.toString()); + } finally { + for ( final File file : containers.values() ) { + FileUtils.deleteFile(file, true); + } + } + } + + + + @Test(timeout=10000) public void testReceiveDropLineage() throws IOException, InterruptedException { final JournalingRepositoryConfig config = new JournalingRepositoryConfig(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 716c8b7..fef120d 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -71,6 +71,7 @@ import org.apache.nifi.provenance.lucene.IndexSearch; import org.apache.nifi.provenance.lucene.IndexingAction; import org.apache.nifi.provenance.lucene.LineageQuery; import org.apache.nifi.provenance.lucene.LuceneUtil; +import org.apache.nifi.provenance.query.ProvenanceQuerySubmission; import org.apache.nifi.provenance.rollover.CompressionAction; import org.apache.nifi.provenance.rollover.RolloverAction; import org.apache.nifi.provenance.search.Query; @@ -1927,4 +1928,14 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository return firstEvents.get(0).getEventTime(); } + + @Override + public ProvenanceQuerySubmission submitQuery(final String query) { + throw new UnsupportedOperationException(); + } + + @Override + public ProvenanceQuerySubmission retrieveProvenanceQuerySubmission(final String queryIdentifier) { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/pom.xml b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/pom.xml index d95e29f..70cc5b4 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/pom.xml @@ -30,6 +30,13 @@ <groupId>org.apache.nifi</groupId> <artifactId>nifi-data-provenance-utils</artifactId> </dependency> + <!-- + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-provenance-query-language</artifactId> + <version>0.0.2-incubating-SNAPSHOT</version> + </dependency> + --> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-utils</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java index a20ca75..bd084bc 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java @@ -43,6 +43,7 @@ import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; import org.apache.nifi.provenance.lineage.FlowFileLineage; import org.apache.nifi.provenance.lineage.Lineage; import org.apache.nifi.provenance.lineage.LineageComputationType; +import org.apache.nifi.provenance.query.ProvenanceQuerySubmission; import org.apache.nifi.provenance.search.Query; import org.apache.nifi.provenance.search.QueryResult; import org.apache.nifi.provenance.search.QuerySubmission; @@ -56,7 +57,8 @@ import org.apache.nifi.util.RingBuffer.ForEachEvaluator; import org.apache.nifi.util.RingBuffer.IterationDirection; public class VolatileProvenanceRepository implements ProvenanceEventRepository { - + private static final int MAX_CONCURRENT_QUERIES = 10; + // properties public static final String BUFFER_SIZE = "nifi.provenance.repository.buffer.size"; @@ -71,6 +73,7 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { private final ConcurrentMap<String, AsyncQuerySubmission> querySubmissionMap = new ConcurrentHashMap<>(); private final ConcurrentMap<String, AsyncLineageSubmission> lineageSubmissionMap = new ConcurrentHashMap<>(); + private final ConcurrentMap<String, ProvenanceQuerySubmission> provQuerySubmissionMap = new ConcurrentHashMap<>(); private final AtomicLong idGenerator = new AtomicLong(0L); private final AtomicBoolean initialized = new AtomicBoolean(false); @@ -613,4 +616,109 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { } } + + + + @Override + public ProvenanceQuerySubmission submitQuery(final String query) { + throw new UnsupportedOperationException(); + /* + if ( provQuerySubmissionMap.size() > MAX_CONCURRENT_QUERIES ) { + final List<String> toRemove = new ArrayList<>(); + final Date now = new Date(); + + for ( final Map.Entry<String, ProvenanceQuerySubmission> entry : provQuerySubmissionMap.entrySet() ) { + if ( entry.getValue().getResult().getExpiration().after(now) ) { + toRemove.add(entry.getKey()); + } + } + + for ( final String id : toRemove ) { + provQuerySubmissionMap.remove(id); + } + + if ( provQuerySubmissionMap.size() > MAX_CONCURRENT_QUERIES ) { + throw new IllegalStateException("There are already " + MAX_CONCURRENT_QUERIES + " outstanding queries for this Provenance Repository; cannot perform any more queries until the existing queries are expired or canceled"); + } + } + + final Iterator<StoredProvenanceEvent> eventItr = ringBuffer.asList().iterator(); + final ProvenanceResultSet rs = ProvenanceQuery.compile(query, getSearchableFields(), getSearchableAttributes()).evaluate(eventItr); + + final Date submissionTime = new Date(); + final String queryId = UUID.randomUUID().toString(); + final Date expiration = new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toNanos(10)); + final ProvenanceQuerySubmission submission = new ProvenanceQuerySubmission() { + private final AtomicBoolean canceled = new AtomicBoolean(false); + + @Override + public String getQuery() { + return query; + } + + @Override + public ProvenanceQueryResult getResult() { + return new ProvenanceQueryResult() { + @Override + public ProvenanceResultSet getResultSet() { + return rs; + } + + @Override + public Date getExpiration() { + return expiration; + } + + @Override + public String getError() { + return null; + } + + @Override + public int getPercentComplete() { + return 100; + } + + @Override + public boolean isFinished() { + return true; + } + }; + } + + @Override + public Date getSubmissionTime() { + return submissionTime; + } + + @Override + public String getQueryIdentifier() { + return queryId; + } + + @Override + public void cancel() { + canceled.set(true); + provQuerySubmissionMap.remove(queryId); + } + + @Override + public boolean isCanceled() { + return canceled.get(); + } + }; + + provQuerySubmissionMap.putIfAbsent(queryId, submission); + return submission; + */ + } + + + @Override + public ProvenanceQuerySubmission retrieveProvenanceQuerySubmission(final String queryIdentifier) { + throw new UnsupportedOperationException(); + /* + return provQuerySubmissionMap.get(queryIdentifier); + */ + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/pom.xml b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/pom.xml index 5f6a1ba..debf202 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/pom.xml @@ -39,6 +39,11 @@ <artifactId>nifi-processor-utils</artifactId> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-provenance-query-language</artifactId> + <version>0.0.2-incubating-SNAPSHOT</version> + </dependency> + <dependency> <groupId>com.yammer.metrics</groupId> <artifactId>metrics-ganglia</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/df43c8b6/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/provenance/GenerateProvenanceReport.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/provenance/GenerateProvenanceReport.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/provenance/GenerateProvenanceReport.java new file mode 100644 index 0000000..6352539 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-reporting-tasks/src/main/java/org/apache/nifi/provenance/GenerateProvenanceReport.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance; + +import java.io.IOException; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.pql.ProvenanceQuery; +import org.apache.nifi.pql.exception.ProvenanceQueryLanguageParsingException; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.provenance.query.ProvenanceResultSet; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.ReportingContext; + +public class GenerateProvenanceReport extends AbstractReportingTask { + + public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() + .name("Provenance Query") + .description("The Provenance Query to run against the repository") + .required(true) + .expressionLanguageSupported(false) + .addValidator(new ProvenanceQueryLanguageValidator()) + .build(); + public static final PropertyDescriptor DESTINATION_FILE = new PropertyDescriptor.Builder() + .name("Destination File") + .description("The file to write the results to. If not specified, the results will be written to the log") + .required(false) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + + private ProvenanceQuery query; + + @OnScheduled + public synchronized void compileQuery(final ReportingContext context) { + final String queryText = context.getProperty(QUERY).getValue(); + this.query = ProvenanceQuery.compile(queryText, context.getEventAccess().getProvenanceRepository().getSearchableFields(), + context.getEventAccess().getProvenanceRepository().getSearchableAttributes()); + } + + @Override + public synchronized void onTrigger(final ReportingContext context) { + try { + final ProvenanceResultSet rs = query.execute(context.getEventAccess().getProvenanceRepository()); + + } catch (final IOException ioe) { + throw new ProcessException(ioe); + } + } + + private static class ProvenanceQueryLanguageValidator implements Validator { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + try { + ProvenanceQuery.compile(input, null, null); + } catch (final ProvenanceQueryLanguageParsingException e) { + return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation(e.getMessage()).build(); + } + + return new ValidationResult.Builder().input(input).subject(subject).valid(true).build(); + } + } +}