began implementation of provenance query language
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/b512ff12 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/b512ff12 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/b512ff12 Branch: refs/heads/prov-query-language Commit: b512ff1277262c12163011c1c64f8a7fa6ccf76d Parents: a2219eb Author: Mark Payne <[email protected]> Authored: Wed Mar 4 15:00:13 2015 -0500 Committer: Mark Payne <[email protected]> Committed: Wed Mar 4 15:00:13 2015 -0500 ---------------------------------------------------------------------- .../provenance/ProvenanceEventRepository.java | 4 +- .../provenance/query/ProvenanceQueryResult.java | 60 ++ .../query/ProvenanceQuerySubmission.java | 64 +++ .../provenance/query/ProvenanceResultSet.java | 36 ++ .../nifi-provenance-query-language/.gitignore | 1 + .../nifi-provenance-query-language/pom.xml | 65 +++ .../org/apache/nifi/pql/ProvenanceQueryLexer.g | 193 +++++++ .../org/apache/nifi/pql/ProvenanceQueryParser.g | 141 +++++ .../java/org/apache/nifi/pql/Formatter.java | 5 + .../org/apache/nifi/pql/LuceneTranslator.java | 189 ++++++ .../org/apache/nifi/pql/ProvenanceQuery.java | 574 +++++++++++++++++++ .../apache/nifi/pql/evaluation/Accumulator.java | 28 + .../nifi/pql/evaluation/BooleanEvaluator.java | 5 + .../nifi/pql/evaluation/OperandEvaluator.java | 7 + .../nifi/pql/evaluation/RecordEvaluator.java | 11 + .../pql/evaluation/RepositoryEvaluator.java | 13 + .../accumulation/AverageAccumulator.java | 112 ++++ .../accumulation/CountAccumulator.java | 91 +++ .../accumulation/EventAccumulator.java | 98 ++++ .../evaluation/accumulation/SumAccumulator.java | 91 +++ .../evaluation/comparison/ConversionUtils.java | 97 ++++ .../evaluation/comparison/EqualsEvaluator.java | 74 +++ .../comparison/GreaterThanEvaluator.java | 51 ++ .../comparison/LessThanEvaluator.java | 51 ++ .../evaluation/comparison/MatchesEvaluator.java | 72 +++ .../comparison/RecordTypeEvaluator.java | 34 ++ .../comparison/StartsWithEvaluator.java | 57 ++ .../conversion/DateToLongEvaluator.java | 27 + .../extraction/AttributeEvaluator.java | 42 ++ .../extraction/ComponentIdEvaluator.java | 23 + .../extraction/RelationshipEvaluator.java | 23 + .../evaluation/extraction/SizeEvaluator.java | 23 + .../extraction/TimestampEvaluator.java | 26 + .../extraction/TransitUriEvaluator.java | 23 + .../evaluation/extraction/TypeEvaluator.java | 24 + .../evaluation/extraction/UuidEvaluator.java | 39 ++ .../evaluation/function/TimeFieldEvaluator.java | 60 ++ .../literals/LongLiteralEvaluator.java | 27 + .../literals/StringLiteralEvaluator.java | 33 ++ .../nifi/pql/evaluation/logic/AndEvaluator.java | 114 ++++ .../nifi/pql/evaluation/logic/OrEvaluator.java | 58 ++ .../nifi/pql/evaluation/order/CellValue.java | 69 +++ .../nifi/pql/evaluation/order/FieldSorter.java | 87 +++ .../pql/evaluation/order/GroupedSorter.java | 123 ++++ .../nifi/pql/evaluation/order/RowSorter.java | 13 + .../pql/evaluation/order/SortDirection.java | 6 + .../nifi/pql/evaluation/order/Sorters.java | 74 +++ .../evaluation/repository/SelectAllRecords.java | 61 ++ .../ProvenanceQueryLanguageException.java | 23 + ...ProvenanceQueryLanguageParsingException.java | 23 + .../java/org/apache/nifi/pql/groups/Group.java | 57 ++ .../org/apache/nifi/pql/groups/Grouper.java | 20 + .../nifi/pql/results/GroupingResultSet.java | 161 ++++++ .../nifi/pql/results/OrderedResultSet.java | 50 ++ .../org/apache/nifi/pql/results/ResultRow.java | 20 + .../apache/nifi/pql/results/RowIterator.java | 69 +++ .../pql/results/StandardOrderedResultSet.java | 111 ++++ .../pql/results/StandardUnorderedResultSet.java | 104 ++++ .../src/main/resources/docs/examples.pql | 48 ++ .../resources/docs/implementation-notes.txt | 52 ++ .../java/org/apache/nifi/pql/TestQuery.java | 348 +++++++++++ .../src/test/resources/nifi.properties | 136 +++++ .../pom.xml | 9 + .../JournalingProvenanceRepository.java | 161 +++++- .../JournalingRepoQuerySubmission.java | 80 +++ .../LazyInitializedProvenanceEvent.java | 367 ++++++++++++ .../journaling/ProgressAwareIterator.java | 25 + .../exception/EventNotFoundException.java | 29 + .../journaling/index/EventIndexSearcher.java | 20 + .../journaling/index/IndexManager.java | 18 + .../journaling/index/LuceneIndexManager.java | 47 +- .../journaling/index/LuceneIndexSearcher.java | 128 ++++- .../journaling/index/LuceneIndexWriter.java | 51 +- .../journaling/index/MultiIndexSearcher.java | 28 + .../provenance/journaling/index/QueryUtils.java | 13 + .../journaling/query/QueryManager.java | 14 + .../journaling/query/StandardQueryManager.java | 39 +- .../TestJournalingProvenanceRepository.java | 307 +++++++++- .../nifi/provenance/journaling/TestUtil.java | 2 +- .../journaling/index/TestEventIndexWriter.java | 2 +- 80 files changed, 5710 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRepository.java b/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRepository.java index 8c7a044..18e5788 100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRepository.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRepository.java @@ -190,7 +190,7 @@ public interface ProvenanceEventRepository extends Closeable { /** * Returns a list of all fields that can be searched via the - * {@link #submitQuery(nifi.provenance.search.Query)} method + * {@link #submitQuery(Query.provenance.search.Query)} method * * @return */ @@ -198,7 +198,7 @@ public interface ProvenanceEventRepository extends Closeable { /** * Returns a list of all FlowFile attributes that can be searched via the - * {@link #submitQuery(nifi.provenance.search.Query)} method + * {@link #submitQuery(Query.provenance.search.Query)} method * * @return */ http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/query/ProvenanceQueryResult.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/query/ProvenanceQueryResult.java b/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/query/ProvenanceQueryResult.java new file mode 100644 index 0000000..cbe2705 --- /dev/null +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/query/ProvenanceQueryResult.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.query; + +import java.util.Date; + +public interface ProvenanceQueryResult { + /** + * Returns the Provenance events that match the query (up to the limit + * specified in the query) + * + * @return + */ + ProvenanceResultSet getResultSet(); + + + /** + * Returns the date at which this QueryResult will expire + * + * @return + */ + Date getExpiration(); + + /** + * If an error occurred while computing the lineage, this will return the + * serialized error; otherwise, returns <code>null</code>. + * + * @return + */ + String getError(); + + /** + * returns an integer between 0 and 100 (inclusive) that indicates what + * percentage of completion the query has reached + * + * @return + */ + int getPercentComplete(); + + /** + * Indicates whether or not the query has finished running + * + * @return + */ + boolean isFinished(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/query/ProvenanceQuerySubmission.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/query/ProvenanceQuerySubmission.java b/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/query/ProvenanceQuerySubmission.java new file mode 100644 index 0000000..e4a9edc --- /dev/null +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/query/ProvenanceQuerySubmission.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.query; + +import java.util.Date; + +import org.apache.nifi.provenance.search.QueryResult; + +public interface ProvenanceQuerySubmission { + + /** + * Returns the query that was submitted + * @return + */ + String getQuery(); + + /** + * Returns the {@link QueryResult} for this query. Note that the result is + * only a partial result if the result of calling + * {@link QueryResult#isFinished()} is <code>false</code>. + * + * @return + */ + ProvenanceQueryResult getResult(); + + /** + * Returns the date at which this query was submitted + * + * @return + */ + Date getSubmissionTime(); + + /** + * Returns the generated identifier for this query result + * + * @return + */ + String getQueryIdentifier(); + + /** + * Cancels the query + */ + void cancel(); + + /** + * @return <code>true</code> if {@link #cancel()} has been called, + * <code>false</code> otherwise + */ + boolean isCanceled(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/query/ProvenanceResultSet.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/query/ProvenanceResultSet.java b/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/query/ProvenanceResultSet.java new file mode 100644 index 0000000..37c3d90 --- /dev/null +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/query/ProvenanceResultSet.java @@ -0,0 +1,36 @@ +package org.apache.nifi.provenance.query; + +import java.util.List; + +import org.apache.nifi.provenance.ProvenanceEventRepository; + + +/** + * Represents a set of results from issuing a query against a {@link ProvenanceEventRepository}. + */ +public interface ProvenanceResultSet { + /** + * Returns the labels for the columns (aka column headers) + * @return + */ + List<String> getLabels(); + + /** + * Returns the types of the columns returned for each row + * @return + */ + List<Class<?>> getReturnType(); + + /** + * Indicates whether or not another result exists in the result set. + * @return + */ + boolean hasNext(); + + /** + * Returns the next result for this query + * @return + */ + List<?> next(); + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/.gitignore ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/.gitignore b/nifi/nifi-commons/nifi-provenance-query-language/.gitignore new file mode 100644 index 0000000..b83d222 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/.gitignore @@ -0,0 +1 @@ +/target/ http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/pom.xml b/nifi/nifi-commons/nifi-provenance-query-language/pom.xml new file mode 100644 index 0000000..a13fd16 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/pom.xml @@ -0,0 +1,65 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-commons</artifactId> + <version>0.0.2-incubating-SNAPSHOT</version> + </parent> + + <artifactId>nifi-provenance-query-language</artifactId> + <packaging>jar</packaging> + + <build> + <plugins> + <plugin> + <groupId>org.antlr</groupId> + <artifactId>antlr3-maven-plugin</artifactId> + <version>3.4</version> + <executions> + <execution> + <goals> + <goal>antlr</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + + <dependency> + <groupId>org.antlr</groupId> + <artifactId>antlr-runtime</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.lucene</groupId> + <artifactId>lucene-core</artifactId> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-volatile-provenance-repository</artifactId> + <version>0.0.2-incubating-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-properties</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/antlr3/org/apache/nifi/pql/ProvenanceQueryLexer.g ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/antlr3/org/apache/nifi/pql/ProvenanceQueryLexer.g b/nifi/nifi-commons/nifi-provenance-query-language/src/main/antlr3/org/apache/nifi/pql/ProvenanceQueryLexer.g new file mode 100644 index 0000000..0815475 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/antlr3/org/apache/nifi/pql/ProvenanceQueryLexer.g @@ -0,0 +1,193 @@ +lexer grammar ProvenanceQueryLexer; + +@header { + package org.apache.nifi.pql; + import org.apache.nifi.pql.exception.ProvenanceQueryLanguageParsingException; +} + +@rulecatch { + catch(final Exception e) { + throw new ProvenanceQueryLanguageParsingException(e); + } +} + +@members { + public void displayRecognitionError(String[] tokenNames, RecognitionException e) { + final StringBuilder sb = new StringBuilder(); + if ( e.token == null ) { + sb.append("Unrecognized token "); + } else { + sb.append("Unexpected token '").append(e.token.getText()).append("' "); + } + sb.append("at line ").append(e.line); + if ( e.approximateLineInfo ) { + sb.append(" (approximately)"); + } + sb.append(", column ").append(e.charPositionInLine); + sb.append(". Query: ").append(e.input.toString()); + + throw new ProvenanceQueryLanguageParsingException(sb.toString()); + } + + public void recover(RecognitionException e) { + final StringBuilder sb = new StringBuilder(); + if ( e.token == null ) { + sb.append("Unrecognized token "); + } else { + sb.append("Unexpected token '").append(e.token.getText()).append("' "); + } + sb.append("at line ").append(e.line); + if ( e.approximateLineInfo ) { + sb.append(" (approximately)"); + } + sb.append(", column ").append(e.charPositionInLine); + sb.append(". Query: ").append(e.input.toString()); + + throw new ProvenanceQueryLanguageParsingException(sb.toString()); + } +} + + +// PUNCTUATION & SPECIAL CHARACTERS +WHITESPACE : (' '|'\t'|'\n'|'\r')+ { $channel = HIDDEN; }; +COMMENT : '#' ( ~('\n') )* '\n' { $channel = HIDDEN; }; + +LPAREN : '('; +RPAREN : ')'; +LBRACKET : '['; +RBRACKET : ']'; +COMMA : ','; +DOT : '.'; +SEMICOLON : ';'; +ASTERISK : '*'; +EQUALS : '='; +NOT_EQUALS : '!=' | '<>'; +GT : '>'; +LT : '<'; +GE : '>='; +LE : '<='; +PIPE : '|'; +NUMBER : ('0'..'9')+; + + +// Keywords +SELECT : 'SELECT' | 'select' | 'Select'; +AS : 'AS' | 'as' | 'As'; +FROM : 'FROM' | 'from' | 'From'; +WHERE : 'WHERE' | 'where' | 'Where'; +HAVING : 'HAVING' | 'having' | 'Having'; +ORDER_BY : 'ORDER BY' | 'order by' | 'Order By'; +ASC : 'ASC' | 'asc' | 'Asc'; +DESC : 'DESC' | 'desc' | 'Desc'; +GROUP_BY : 'GROUP BY' | 'group by' | 'Group By'; +EVENT : 'EVENT' | 'event' | 'Event'; +RELATIONSHIP : 'RELATIONSHIP' | 'relationship' | 'Relationship'; + + +// Operators +WITHIN : 'WITHIN' | 'within' | 'Within'; +MATCHES : 'MATCHES' | 'matches' | 'Matches'; +CONTAINS : 'CONTAINS' | 'contains' | 'Contains'; +IS_NULL : 'IS NULL' | 'is null' | 'Is Null'; +NOT_NULL : 'NOT NULL' | 'not null' | 'Not Null'; +IN : 'IN' | 'in' | 'In'; +BETWEEN : 'BETWEEN' | 'between' | 'Between'; +AND : 'AND' | 'and' | 'And'; +OR : 'OR' | 'or' | 'Or'; +NOT : 'NOT' | 'not' | 'Not'; +LIMIT : 'LIMIT' | 'limit' | 'Limit'; +STARTS_WITH : 'STARTS WITH' | 'starts with' | 'Starts with' | 'Starts With'; + +// Functions +COUNT : 'COUNT' | 'count' | 'Count'; +SUM : 'SUM' | 'sum' | 'Sum'; +MIN : 'MIN' | 'min' | 'Min'; +MAX : 'MAX' | 'max' | 'Max'; +AVG : 'AVG' | 'avg' | 'Avg'; +HOUR : 'HOUR' | 'hour' | 'Hour'; +MINUTE : 'MINUTE' | 'minute' | 'Minute'; +SECOND : 'SECOND' | 'second' | 'Second'; +DAY : 'DAY' | 'day' | 'Day'; +MONTH : 'MONTH' | 'month' | 'Month'; +YEAR : 'YEAR' | 'year' | 'Year'; + + +// Event Properties +TRANSIT_URI : 'TRANSITURI' | 'transituri' | 'TransitUri'; +TIMESTAMP : 'TIME' | 'time' | 'Time'; +FILESIZE : 'SIZE' | 'size' | 'Size'; +TYPE : 'TYPE' | 'type' | 'Type'; +COMPONENT_ID : 'COMPONENTID' | 'componentid' | 'ComponentId' | 'componentId' | 'componentID' | 'ComponentID'; +UUID : 'UUID' | 'uuid' | 'Uuid'; + +// Event Types +RECEIVE : 'RECEIVE' | 'receive' | 'Receive'; +SEND : 'SEND' | 'send' | 'Send'; +DROP : 'DROP' | 'drop' | 'Drop'; +CREATE : 'CREATE' | 'create' | 'Create'; +EXPIRE : 'EXPIRE' | 'expire' | 'Expire'; +FORK : 'FORK' | 'fork' | 'Fork'; +JOIN : 'JOIN' | 'join' | 'Join'; +CLONE : 'CLONE' | 'clone' | 'Clone'; +CONTENT_MODIFIED : 'CONTENT_MODIFIED' | 'content_modified' | 'Content_Modified'; +ATTRIBUTES_MODIFIED : 'ATTRIBUTES_MODIFIED' | 'attributes_modified' | 'Attributes_Modified'; +ROUTE : 'ROUTE' | 'route' | 'Route'; +REPLAY : 'REPLAY' | 'replay' | 'Replay'; + + + +IDENTIFIER : ( + ('a'..'z' | 'A'..'Z' | '$') + ('a'..'z' | 'A'..'Z' | '$' | '0'..'9' | '_' )* + ); + + + +// STRINGS +STRING_LITERAL +@init{StringBuilder lBuf = new StringBuilder();} + : + ( + '"' + ( + escaped=ESC {lBuf.append(getText());} | + normal = ~( '"' | '\\' | '\n' | '\r' | '\t' ) { lBuf.appendCodePoint(normal);} + )* + '"' + ) + { + setText(lBuf.toString()); + } + | + ( + '\'' + ( + escaped=ESC {lBuf.append(getText());} | + normal = ~( '\'' | '\\' | '\n' | '\r' | '\t' ) { lBuf.appendCodePoint(normal);} + )* + '\'' + ) + { + setText(lBuf.toString()); + } + ; + + +fragment +ESC + : '\\' + ( + '"' { setText("\""); } + | '\'' { setText("\'"); } + | 'r' { setText("\r"); } + | 'n' { setText("\n"); } + | 't' { setText("\t"); } + | '\\' { setText("\\\\"); } + | nextChar = ~('"' | '\'' | 'r' | 'n' | 't' | '\\') + { + StringBuilder lBuf = new StringBuilder(); lBuf.append("\\\\").appendCodePoint(nextChar); setText(lBuf.toString()); + } + ) + ; + + http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/antlr3/org/apache/nifi/pql/ProvenanceQueryParser.g ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/antlr3/org/apache/nifi/pql/ProvenanceQueryParser.g b/nifi/nifi-commons/nifi-provenance-query-language/src/main/antlr3/org/apache/nifi/pql/ProvenanceQueryParser.g new file mode 100644 index 0000000..3837729 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/antlr3/org/apache/nifi/pql/ProvenanceQueryParser.g @@ -0,0 +1,141 @@ +parser grammar ProvenanceQueryParser; + +options { + output=AST; + tokenVocab=ProvenanceQueryLexer; +} + +tokens { + PQL; + QUERY; + EVENT_PROPERTY; + ATTRIBUTE; + ORDER; +} + +@header { + package org.apache.nifi.pql; + import org.apache.nifi.pql.exception.ProvenanceQueryLanguageParsingException; +} + + +@members { + public void displayRecognitionError(String[] tokenNames, RecognitionException e) { + final StringBuilder sb = new StringBuilder(); + if ( e.token == null ) { + sb.append("Unrecognized token "); + } else { + sb.append("Unexpected token '").append(e.token.getText()).append("' "); + } + sb.append("at line ").append(e.line); + if ( e.approximateLineInfo ) { + sb.append(" (approximately)"); + } + sb.append(", column ").append(e.charPositionInLine); + sb.append(". Query: ").append(e.input.toString()); + + throw new ProvenanceQueryLanguageParsingException(sb.toString()); + } + + public void recover(final RecognitionException e) { + final StringBuilder sb = new StringBuilder(); + if ( e.token == null ) { + sb.append("Unrecognized token "); + } else { + sb.append("Unexpected token '").append(e.token.getText()).append("' "); + } + sb.append("at line ").append(e.line); + if ( e.approximateLineInfo ) { + sb.append(" (approximately)"); + } + sb.append(", column ").append(e.charPositionInLine); + sb.append(". Query: ").append(e.input.toString()); + + throw new ProvenanceQueryLanguageParsingException(sb.toString()); + } +} + + + +pql : query -> + ^(PQL query); + +query : selectClause + fromClause? + whereClause? + groupByClause? +// havingClause? + orderByClause? + limitClause? + SEMICOLON? + EOF -> + ^(QUERY selectClause fromClause? whereClause? groupByClause? orderByClause? limitClause?); + + +selectClause : SELECT^ selectable (COMMA! selectable)*; + + +selectable : function | (selectableSource ( (DOT! eventProperty^) | (LBRACKET! attribute^ RBRACKET!) )?); + +selectableSource : EVENT | IDENTIFIER; + +eventProperty : propertyName -> + ^(EVENT_PROPERTY propertyName ); + +propertyName : UUID | TRANSIT_URI | TIMESTAMP | FILESIZE | TYPE | COMPONENT_ID | RELATIONSHIP; + +attribute : STRING_LITERAL -> + ^(ATTRIBUTE STRING_LITERAL); + + +fromClause : FROM^ source (COMMA! source)*; + +source : RECEIVE | SEND | DROP | CREATE | EXPIRE | FORK | JOIN | CLONE | CONTENT_MODIFIED | ATTRIBUTES_MODIFIED | ROUTE | REPLAY | ASTERISK; + + + +whereClause : WHERE^ conditions; + +conditions : condition ((AND^ | OR^) condition)*; + +condition : NOT^ condition | LPAREN! conditions RPAREN! | evaluation; + +evaluation : expression + ( + unaryOperator^ + | (binaryOperator^ expression) + | (BETWEEN^ NUMBER AND! NUMBER) + ); + +expression : (LPAREN! expr RPAREN!) | expr; + +expr : constant | ref; + +ref : selectableSource ( (DOT! eventProperty^) | (LBRACKET! attribute^ RBRACKET!) )?; + +unaryOperator : IS_NULL | NOT_NULL; + +binaryOperator : EQUALS | NOT_EQUALS | GT | LT | GE | LE | MATCHES | STARTS_WITH; + +constant : NUMBER | STRING_LITERAL; + + +function : functionName^ LPAREN! ref RPAREN!; + +functionName : COUNT | SUM | MIN | MAX | AVG | HOUR | MINUTE | SECOND | DAY | MONTH | YEAR; + + +groupByClause : GROUP_BY^ group (COMMA! group)*; + +group : ref^ | function^; + +orderByClause : ORDER_BY^ order (COMMA! order)*; + +order: selectable direction? -> + ^(ORDER selectable direction?); + +direction : ASC | DESC; + +limitClause : LIMIT^ NUMBER; + + http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/Formatter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/Formatter.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/Formatter.java new file mode 100644 index 0000000..7090d26 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/Formatter.java @@ -0,0 +1,5 @@ +package org.apache.nifi.pql; + +public interface Formatter { + String format(Object value); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/LuceneTranslator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/LuceneTranslator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/LuceneTranslator.java new file mode 100644 index 0000000..96b4dbf --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/LuceneTranslator.java @@ -0,0 +1,189 @@ +package org.apache.nifi.pql; + +import static org.apache.nifi.pql.ProvenanceQueryParser.*; + +import java.text.SimpleDateFormat; +import java.util.regex.Pattern; + +import org.apache.lucene.index.Term; +import org.apache.lucene.search.BooleanClause.Occur; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.NumericRangeQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.RegexpQuery; +import org.apache.lucene.search.TermQuery; +import org.apache.nifi.pql.evaluation.OperandEvaluator; +import org.apache.nifi.pql.evaluation.RecordEvaluator; +import org.apache.nifi.pql.evaluation.comparison.EqualsEvaluator; +import org.apache.nifi.pql.evaluation.comparison.GreaterThanEvaluator; +import org.apache.nifi.pql.evaluation.comparison.LessThanEvaluator; +import org.apache.nifi.pql.evaluation.comparison.MatchesEvaluator; +import org.apache.nifi.pql.evaluation.comparison.StartsWithEvaluator; +import org.apache.nifi.pql.evaluation.extraction.AttributeEvaluator; +import org.apache.nifi.pql.evaluation.literals.LongLiteralEvaluator; +import org.apache.nifi.pql.evaluation.literals.StringLiteralEvaluator; +import org.apache.nifi.pql.evaluation.logic.AndEvaluator; +import org.apache.nifi.pql.evaluation.logic.OrEvaluator; +import org.apache.nifi.provenance.SearchableFields; + + +public class LuceneTranslator { + + public static Query toLuceneQuery(final RecordEvaluator<Boolean> whereClause) { + if ( whereClause == null ) { + return new MatchAllDocsQuery(); + } + + final BooleanQuery query = new BooleanQuery(); + switch (whereClause.getEvaluatorType()) { + case AND: + final AndEvaluator and = (AndEvaluator) whereClause; + query.add(toLuceneQuery(and.getLHS()), Occur.MUST); + query.add(toLuceneQuery(and.getRHS()), Occur.MUST); + break; + case OR: + final OrEvaluator or = (OrEvaluator) whereClause; + query.add(toLuceneQuery(or.getLHS()), Occur.SHOULD); + query.add(toLuceneQuery(or.getRHS()), Occur.SHOULD); + query.setMinimumNumberShouldMatch(1); + break; + case GT: { + final GreaterThanEvaluator gt = (GreaterThanEvaluator) whereClause; + final OperandEvaluator<?> lhs = gt.getLHS(); + final OperandEvaluator<?> rhs = gt.getRHS(); + + final String fieldName = getFieldName(lhs); + if ( fieldName != null ) { + Long rhsValue = null; + if ( rhs.getEvaluatorType() == NUMBER ) { + rhsValue = ((LongLiteralEvaluator) rhs).evaluate(null); + } else if ( rhs.getEvaluatorType() == STRING_LITERAL ) { + final SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); + try { + rhsValue = sdf.parse(((StringLiteralEvaluator) rhs).evaluate(null)).getTime(); + } catch (final Exception e) { + } + } + + if ( rhsValue != null ) { + query.add(NumericRangeQuery.newLongRange(fieldName, rhsValue, Long.MAX_VALUE, true, true), Occur.MUST); + } + } + } + break; + case LT: { + final LessThanEvaluator lt = (LessThanEvaluator) whereClause; + final OperandEvaluator<?> lhs = lt.getLHS(); + final OperandEvaluator<?> rhs = lt.getRHS(); + + final String fieldName = getFieldName(lhs); + if ( fieldName != null ) { + Long rhsValue = null; + if ( rhs.getEvaluatorType() == NUMBER ) { + rhsValue = ((LongLiteralEvaluator) rhs).evaluate(null); + } else if ( rhs.getEvaluatorType() == STRING_LITERAL ) { + final SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); + try { + rhsValue = sdf.parse(((StringLiteralEvaluator) rhs).evaluate(null)).getTime(); + } catch (final Exception e) { + } + } + + if ( rhsValue != null ) { + query.add(NumericRangeQuery.newLongRange(fieldName, Long.MIN_VALUE, rhsValue, true, true), Occur.MUST); + } + } + } + break; + case MATCHES: { + final MatchesEvaluator me = (MatchesEvaluator) whereClause; + final OperandEvaluator<?> lhs = me.getLHS(); + final OperandEvaluator<?> rhs = me.getRHS(); + addMatches(lhs, rhs, query); + } + break; + case STARTS_WITH: { + final StartsWithEvaluator startsWith = (StartsWithEvaluator) whereClause; + final OperandEvaluator<?> lhs = startsWith.getLHS(); + final OperandEvaluator<?> rhs = startsWith.getRHS(); + + if ( rhs.getEvaluatorType() == STRING_LITERAL ) { + final String base = rhs.evaluate(null).toString(); + + final StringLiteralEvaluator regexEval = new StringLiteralEvaluator(Pattern.quote(base) + ".*"); + addMatches(lhs, regexEval, query); + } + } + break; + case EQUALS: { + final EqualsEvaluator equals = (EqualsEvaluator) whereClause; + final OperandEvaluator<?> lhs = equals.getLHS(); + final OperandEvaluator<?> rhs = equals.getRHS(); + + final String fieldName = getFieldName(lhs); + if ( fieldName != null && rhs.getEvaluatorType() == STRING_LITERAL ) { + query.add(new TermQuery(new Term(fieldName, toLower(rhs.evaluate(null).toString()))), Occur.MUST); + } + } + break; + } + + return query; + } + + private static String toLower(final String value) { + return value == null ? null : value.toLowerCase(); + } + + private static String getFieldName(final OperandEvaluator<?> eval) { + switch (eval.getEvaluatorType()) { + case TIMESTAMP: + return SearchableFields.EventTime.getSearchableFieldName(); + case FILESIZE: + return SearchableFields.FileSize.getSearchableFieldName(); + case ATTRIBUTE: + return ((AttributeEvaluator) eval).getAttributeNameEvaluator().evaluate(null).toLowerCase(); + case TRANSIT_URI: + return SearchableFields.TransitURI.getSearchableFieldName(); + case RELATIONSHIP: + return SearchableFields.Relationship.getSearchableFieldName(); + case TYPE: + return SearchableFields.EventType.getSearchableFieldName(); + case COMPONENT_ID: + return SearchableFields.ComponentID.getSearchableFieldName(); + case UUID: + return SearchableFields.FlowFileUUID.getSearchableFieldName(); + } + + return null; + } + + private static void addMatches(final OperandEvaluator<?> lhs, final OperandEvaluator<?> rhs, final BooleanQuery query) { + String field = null; + switch (lhs.getEvaluatorType()) { + case ATTRIBUTE: + final AttributeEvaluator attr = (AttributeEvaluator) lhs; + final OperandEvaluator<?> attrEval = attr.getAttributeNameEvaluator(); + if ( attrEval.getEvaluatorType() == STRING_LITERAL ) { + field = (String) attrEval.evaluate(null); + } + break; + case COMPONENT_ID: + case TRANSIT_URI: + case TYPE: + field = lhs.evaluate(null).toString(); + break; + } + + String regex = null; + if ( rhs.getEvaluatorType() == STRING_LITERAL ) { + regex = rhs.evaluate(null).toString(); + } + + if ( field != null && regex != null ) { + query.add(new RegexpQuery(new Term(field, regex)), Occur.MUST); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/ProvenanceQuery.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/ProvenanceQuery.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/ProvenanceQuery.java new file mode 100644 index 0000000..200a8ee --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/ProvenanceQuery.java @@ -0,0 +1,574 @@ +package org.apache.nifi.pql; + +import static org.apache.nifi.pql.ProvenanceQueryParser.AND; +import static org.apache.nifi.pql.ProvenanceQueryParser.ASC; +import static org.apache.nifi.pql.ProvenanceQueryParser.ATTRIBUTE; +import static org.apache.nifi.pql.ProvenanceQueryParser.AVG; +import static org.apache.nifi.pql.ProvenanceQueryParser.COMPONENT_ID; +import static org.apache.nifi.pql.ProvenanceQueryParser.COUNT; +import static org.apache.nifi.pql.ProvenanceQueryParser.DAY; +import static org.apache.nifi.pql.ProvenanceQueryParser.EQUALS; +import static org.apache.nifi.pql.ProvenanceQueryParser.EVENT; +import static org.apache.nifi.pql.ProvenanceQueryParser.EVENT_PROPERTY; +import static org.apache.nifi.pql.ProvenanceQueryParser.FILESIZE; +import static org.apache.nifi.pql.ProvenanceQueryParser.FROM; +import static org.apache.nifi.pql.ProvenanceQueryParser.GROUP_BY; +import static org.apache.nifi.pql.ProvenanceQueryParser.GT; +import static org.apache.nifi.pql.ProvenanceQueryParser.HOUR; +import static org.apache.nifi.pql.ProvenanceQueryParser.IDENTIFIER; +import static org.apache.nifi.pql.ProvenanceQueryParser.LIMIT; +import static org.apache.nifi.pql.ProvenanceQueryParser.LT; +import static org.apache.nifi.pql.ProvenanceQueryParser.MATCHES; +import static org.apache.nifi.pql.ProvenanceQueryParser.MINUTE; +import static org.apache.nifi.pql.ProvenanceQueryParser.NOT; +import static org.apache.nifi.pql.ProvenanceQueryParser.NOT_EQUALS; +import static org.apache.nifi.pql.ProvenanceQueryParser.NUMBER; +import static org.apache.nifi.pql.ProvenanceQueryParser.OR; +import static org.apache.nifi.pql.ProvenanceQueryParser.ORDER_BY; +import static org.apache.nifi.pql.ProvenanceQueryParser.RELATIONSHIP; +import static org.apache.nifi.pql.ProvenanceQueryParser.SECOND; +import static org.apache.nifi.pql.ProvenanceQueryParser.STARTS_WITH; +import static org.apache.nifi.pql.ProvenanceQueryParser.STRING_LITERAL; +import static org.apache.nifi.pql.ProvenanceQueryParser.SUM; +import static org.apache.nifi.pql.ProvenanceQueryParser.TIMESTAMP; +import static org.apache.nifi.pql.ProvenanceQueryParser.TRANSIT_URI; +import static org.apache.nifi.pql.ProvenanceQueryParser.TYPE; +import static org.apache.nifi.pql.ProvenanceQueryParser.UUID; +import static org.apache.nifi.pql.ProvenanceQueryParser.WHERE; +import static org.apache.nifi.pql.ProvenanceQueryParser.YEAR; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.antlr.runtime.ANTLRStringStream; +import org.antlr.runtime.CharStream; +import org.antlr.runtime.CommonTokenStream; +import org.antlr.runtime.tree.Tree; +import org.apache.nifi.pql.evaluation.Accumulator; +import org.apache.nifi.pql.evaluation.BooleanEvaluator; +import org.apache.nifi.pql.evaluation.OperandEvaluator; +import org.apache.nifi.pql.evaluation.RecordEvaluator; +import org.apache.nifi.pql.evaluation.RepositoryEvaluator; +import org.apache.nifi.pql.evaluation.accumulation.AverageAccumulator; +import org.apache.nifi.pql.evaluation.accumulation.CountAccumulator; +import org.apache.nifi.pql.evaluation.accumulation.EventAccumulator; +import org.apache.nifi.pql.evaluation.accumulation.SumAccumulator; +import org.apache.nifi.pql.evaluation.comparison.EqualsEvaluator; +import org.apache.nifi.pql.evaluation.comparison.GreaterThanEvaluator; +import org.apache.nifi.pql.evaluation.comparison.LessThanEvaluator; +import org.apache.nifi.pql.evaluation.comparison.MatchesEvaluator; +import org.apache.nifi.pql.evaluation.comparison.RecordTypeEvaluator; +import org.apache.nifi.pql.evaluation.comparison.StartsWithEvaluator; +import org.apache.nifi.pql.evaluation.extraction.AttributeEvaluator; +import org.apache.nifi.pql.evaluation.extraction.ComponentIdEvaluator; +import org.apache.nifi.pql.evaluation.extraction.RelationshipEvaluator; +import org.apache.nifi.pql.evaluation.extraction.SizeEvaluator; +import org.apache.nifi.pql.evaluation.extraction.TimestampEvaluator; +import org.apache.nifi.pql.evaluation.extraction.TransitUriEvaluator; +import org.apache.nifi.pql.evaluation.extraction.TypeEvaluator; +import org.apache.nifi.pql.evaluation.extraction.UuidEvaluator; +import org.apache.nifi.pql.evaluation.function.TimeFieldEvaluator; +import org.apache.nifi.pql.evaluation.literals.LongLiteralEvaluator; +import org.apache.nifi.pql.evaluation.literals.StringLiteralEvaluator; +import org.apache.nifi.pql.evaluation.logic.AndEvaluator; +import org.apache.nifi.pql.evaluation.logic.OrEvaluator; +import org.apache.nifi.pql.evaluation.order.FieldSorter; +import org.apache.nifi.pql.evaluation.order.GroupedSorter; +import org.apache.nifi.pql.evaluation.order.RowSorter; +import org.apache.nifi.pql.evaluation.order.SortDirection; +import org.apache.nifi.pql.evaluation.repository.SelectAllRecords; +import org.apache.nifi.pql.exception.ProvenanceQueryLanguageParsingException; +import org.apache.nifi.pql.results.GroupingResultSet; +import org.apache.nifi.pql.results.StandardOrderedResultSet; +import org.apache.nifi.pql.results.StandardUnorderedResultSet; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.StoredProvenanceEvent; +import org.apache.nifi.provenance.query.ProvenanceResultSet; + +public class ProvenanceQuery { + private final Tree tree; + private final String pql; + private final List<Accumulator<?>> selectAccumulators; + private final List<RecordEvaluator<?>> groupEvaluators; + private final RecordEvaluator<Boolean> sourceEvaluator; + private final RecordEvaluator<Boolean> conditionEvaluator; + private final RowSorter sorter; + private final Long limit; + + private long accumulatorIdGenerator = 0L; + + public static ProvenanceQuery compile(final String pql) { + try { + final CommonTokenStream lexerTokenStream = createTokenStream(pql); + final ProvenanceQueryParser parser = new ProvenanceQueryParser(lexerTokenStream); + final Tree ast = (Tree) parser.pql().getTree(); + final Tree tree = ast.getChild(0); + + return new ProvenanceQuery(tree, pql); + } catch (final ProvenanceQueryLanguageParsingException e) { + throw e; + } catch (final Exception e) { + throw new ProvenanceQueryLanguageParsingException(e); + } + } + + private static CommonTokenStream createTokenStream(final String expression) throws ProvenanceQueryLanguageParsingException { + final CharStream input = new ANTLRStringStream(expression); + final ProvenanceQueryLexer lexer = new ProvenanceQueryLexer(input); + return new CommonTokenStream(lexer); + } + + private ProvenanceQuery(final Tree tree, final String pql) { + this.tree = tree; + this.pql = pql; + + Tree fromTree = null; + Tree whereTree = null; + Tree groupByTree = null; + Tree limitTree = null; + Tree orderByTree = null; + + for (int i=1; i < tree.getChildCount(); i++) { + final Tree subTree = tree.getChild(i); + switch (subTree.getType()) { + case FROM: + fromTree = subTree; + break; + case WHERE: + whereTree = subTree; + break; + case GROUP_BY: + groupByTree = subTree; + break; + case LIMIT: + limitTree = subTree; + break; + case ORDER_BY: + orderByTree = subTree; + break; + default: + // TODO: Handle other types! + continue; + } + } + + sourceEvaluator = (fromTree == null) ? null : buildSourceEvaluator(fromTree); + + // Distribute AND's over OR's. + final BooleanEvaluator where = (whereTree == null) ? null : buildConditionEvaluator(whereTree.getChild(0)); + conditionEvaluator = where; + + groupEvaluators = (groupByTree == null) ? null : buildGroupEvaluators(groupByTree); + limit = (limitTree == null) ? null : Long.parseLong(limitTree.getChild(0).getText()); + sorter = (orderByTree == null) ? null : buildSorter(orderByTree, groupByTree != null); + + boolean requiresAggregate = false; + if ( groupEvaluators != null && !groupEvaluators.isEmpty() ) { + requiresAggregate = true; + } + if ( requiresAggregate ) { + selectAccumulators = buildAccumulators(tree.getChild(0), true); + } else { + final List<Accumulator<?>> accumulators = buildAccumulators(tree.getChild(0), false); + + for ( final Accumulator<?> accumulator : accumulators ) { + if ( accumulator.isAggregateFunction() ) { + requiresAggregate = true; + break; + } + } + + if ( requiresAggregate ) { + selectAccumulators = buildAccumulators(tree.getChild(0), true); + } else { + selectAccumulators = accumulators; + } + } + + + //repoEvaluator = new SelectAllRecords(); + //repoEvaluator = new SelectFromLucene(LuceneTranslator.toLuceneQuery(where)); + } + + @Override + public String toString() { + return printTree(tree); + } + + public String getQuery() { + return pql; + + } + + private String printTree(final Tree tree) { + final StringBuilder sb = new StringBuilder(); + printTree(tree, 0, sb); + + return sb.toString(); + } + + private void printTree(final Tree tree, final int spaces, final StringBuilder sb) { + for (int i=0; i < spaces; i++) { + sb.append(" "); + } + + if ( tree.getText().trim().isEmpty() ) { + sb.append(tree.toString()).append("\n"); + } else { + sb.append(tree.getText()).append("\n"); + } + + for (int i=0; i < tree.getChildCount(); i++) { + printTree(tree.getChild(i), spaces + 2, sb); + } + } + + private List<Accumulator<?>> buildAccumulators(final Tree selectTree, final boolean distinct) { + final List<Accumulator<?>> accumulators = new ArrayList<>(); + + if ( selectTree.getType() != ProvenanceQueryParser.SELECT ) { + throw new IllegalArgumentException("Cannot build accumulators for a non-SELECT tree"); + } + + for (int i=0; i < selectTree.getChildCount(); i++) { + final Tree childTree = selectTree.getChild(i); + accumulators.add(buildAccumulator(childTree, distinct)); + } + + return accumulators; + } + + private Accumulator<?> buildAccumulator(final Tree tree, final boolean distinct) { + switch (tree.getType()) { + case SUM: + return new SumAccumulator(accumulatorIdGenerator++, toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), tree), "SUM(" + getLabel(tree.getChild(0)) + ")"); + case AVG: + return new AverageAccumulator(accumulatorIdGenerator++, toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), tree), "AVG(" + getLabel(tree.getChild(0)) + ")"); + case EVENT: + return new EventAccumulator(accumulatorIdGenerator++, getLabel(tree), distinct); + case IDENTIFIER: + return new EventAccumulator(accumulatorIdGenerator++, getLabel(tree.getChild(0)), distinct); + case EVENT_PROPERTY: + case ATTRIBUTE: + return new EventAccumulator(accumulatorIdGenerator++, getLabel(tree.getChild(0)), buildOperandEvaluator(tree), distinct); + case YEAR: + case DAY: + case HOUR: + case MINUTE: + case SECOND: + return new EventAccumulator(accumulatorIdGenerator++, getLabel(tree), buildOperandEvaluator(tree), distinct); + case COUNT: + if ( "Event".equalsIgnoreCase(tree.getChild(0).getText() ) ) { + return new CountAccumulator(accumulatorIdGenerator++, null, "COUNT(" + getLabel(tree.getChild(0)) + ")"); + } + return new CountAccumulator(accumulatorIdGenerator++, buildOperandEvaluator(tree.getChild(0)), "COUNT(" + getLabel(tree.getChild(0)) + ")"); + default: + throw new UnsupportedOperationException("Haven't implemented accumulators yet for " + tree); + } + } + + private String getLabel(final Tree tree) { + final int type = tree.getType(); + + switch (type) { + case EVENT_PROPERTY: + case ATTRIBUTE: + return tree.getChild(0).getText(); + case YEAR: + case DAY: + case HOUR: + case MINUTE: + case SECOND: + return tree.getText() + "(" + getLabel(tree.getChild(0)) + ")"; + } + + return tree.getText(); + } + + private OperandEvaluator<?> buildOperandEvaluator(final Tree tree) { + switch (tree.getType()) { + case EVENT_PROPERTY: + switch (tree.getChild(0).getType()) { + case FILESIZE: + return new SizeEvaluator(); + case TRANSIT_URI: + return new TransitUriEvaluator(); + case TIMESTAMP: + return new TimestampEvaluator(); + case TYPE: + return new TypeEvaluator(); + case COMPONENT_ID: + return new ComponentIdEvaluator(); + case RELATIONSHIP: + return new RelationshipEvaluator(); + case UUID: + return new UuidEvaluator(); + default: + // TODO: IMPLEMENT + throw new UnsupportedOperationException("Haven't implemented extraction of property " + tree.getChild(0).getText()); + } + case ATTRIBUTE: + return new AttributeEvaluator(toStringEvaluator(buildOperandEvaluator(tree.getChild(0)), tree)); + case STRING_LITERAL: + return new StringLiteralEvaluator(tree.getText()); + case NUMBER: + return new LongLiteralEvaluator(Long.valueOf(tree.getText())); + case HOUR: + return new TimeFieldEvaluator(toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), tree), Calendar.HOUR_OF_DAY, HOUR); + case DAY: + return new TimeFieldEvaluator(toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), tree), Calendar.DAY_OF_YEAR, DAY); + case YEAR: + return new TimeFieldEvaluator(toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), tree), Calendar.YEAR, YEAR); + case MINUTE: + return new TimeFieldEvaluator(toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), tree), Calendar.MINUTE, MINUTE); + case SECOND: + return new TimeFieldEvaluator(toLongEvaluator(buildOperandEvaluator(tree.getChild(0)), tree), Calendar.SECOND, SECOND); + default: + throw new ProvenanceQueryLanguageParsingException("Unable to extract value '" + tree.toString() + "' from event because it is not a valid "); + } + } + + + private RecordEvaluator<Boolean> buildSourceEvaluator(final Tree fromTree) { + if ( fromTree == null ) { + throw new NullPointerException(); + } + if ( fromTree.getType() != FROM ) { + throw new IllegalArgumentException("Cannot build Soruce Evaluator from a Tree that is not a FROM-tree"); + } + + final Set<ProvenanceEventType> types = new HashSet<>(); + for ( int i=0; i < fromTree.getChildCount(); i++ ) { + final Tree typeTree = fromTree.getChild(i); + if ( "*".equals(typeTree.getText()) ) { + return null; + } else { + types.add(ProvenanceEventType.valueOf(typeTree.getText().toUpperCase())); + } + } + + return new RecordTypeEvaluator(types); + } + + + private BooleanEvaluator buildConditionEvaluator(final Tree tree) { + switch (tree.getType()) { + case AND: + return new AndEvaluator(buildConditionEvaluator(tree.getChild(0)), buildConditionEvaluator(tree.getChild(1))); + case OR: + return new OrEvaluator(buildConditionEvaluator(tree.getChild(0)), buildConditionEvaluator(tree.getChild(1))); + case EQUALS: + return new EqualsEvaluator(buildOperandEvaluator(tree.getChild(0)), buildOperandEvaluator(tree.getChild(1))); + case NOT_EQUALS: + return new EqualsEvaluator(buildOperandEvaluator(tree.getChild(0)), buildOperandEvaluator(tree.getChild(1)), true); + case GT: + return new GreaterThanEvaluator(buildOperandEvaluator(tree.getChild(0)), buildOperandEvaluator(tree.getChild(1))); + case LT: + return new LessThanEvaluator(buildOperandEvaluator(tree.getChild(0)), buildOperandEvaluator(tree.getChild(1))); + case NOT: + return buildConditionEvaluator(tree.getChild(0)).negate(); + case MATCHES: { + final OperandEvaluator<?> rhs = buildOperandEvaluator(tree.getChild(1)); + if ( !String.class.equals( rhs.getType() ) ) { + throw new ProvenanceQueryLanguageParsingException("Right-hand side of MATCHES operator must be a Regular Expression but found " + rhs); + } + return new MatchesEvaluator(buildOperandEvaluator(tree.getChild(0)), rhs); + } + case STARTS_WITH: { + final OperandEvaluator<?> rhs = buildOperandEvaluator(tree.getChild(1)); + if ( !String.class.equals( rhs.getType() ) ) { + throw new ProvenanceQueryLanguageParsingException("Right-hand side of STARTS WITH operator must be a String but found " + rhs); + } + return new StartsWithEvaluator(buildOperandEvaluator(tree.getChild(0)), rhs); + } + default: + // TODO: Implement + throw new UnsupportedOperationException("Have not yet implemented condition evaluator for " + tree); + } + } + + + private <T> OperandEvaluator<T> castEvaluator(final OperandEvaluator<?> eval, final Tree tree, final Class<T> expectedType) { + if ( eval.getType() != expectedType ) { + throw new ProvenanceQueryLanguageParsingException("Expected type " + expectedType.getSimpleName() + " but found type " + eval.getType() + " for term: " + tree); + } + + @SuppressWarnings("unchecked") + final OperandEvaluator<T> retEvaluator = ((OperandEvaluator<T>) eval); + return retEvaluator; + + } + + private OperandEvaluator<String> toStringEvaluator(final OperandEvaluator<?> eval, final Tree tree) { + return castEvaluator(eval, tree, String.class); + } + + private OperandEvaluator<Long> toLongEvaluator(final OperandEvaluator<?> eval, final Tree tree) { + return castEvaluator(eval, tree, Long.class); + } + + + private List<RecordEvaluator<?>> buildGroupEvaluators(final Tree groupByTree) { + if ( groupByTree == null ) { + return null; + } + + if ( groupByTree.getType() != GROUP_BY ) { + throw new IllegalArgumentException("Expected GroupBy tree but got " + groupByTree); + } + + final List<RecordEvaluator<?>> evaluators = new ArrayList<>(groupByTree.getChildCount()); + for (int i=0; i < groupByTree.getChildCount(); i++) { + final Tree tree = groupByTree.getChild(i); + final RecordEvaluator<?> evaluator; + + switch (tree.getType()) { + case EVENT_PROPERTY: + case STRING_LITERAL: + case ATTRIBUTE: + case YEAR: + case DAY: + case HOUR: + case MINUTE: + case SECOND: + evaluator = buildOperandEvaluator(tree); + break; + default: + evaluator = buildConditionEvaluator(tree); + break; + } + + evaluators.add(evaluator); + } + + return evaluators; + } + + private RowSorter buildSorter(final Tree orderByTree, final boolean grouped) { + if ( orderByTree.getType() != ORDER_BY ) { + throw new IllegalArgumentException(); + } + + if ( grouped ) { + final Map<Accumulator<?>, SortDirection> accumulators = new LinkedHashMap<>(orderByTree.getChildCount()); + for (int i=0; i < orderByTree.getChildCount(); i++) { + final Tree orderTree = orderByTree.getChild(i); + final Accumulator<?> accumulator = buildAccumulator(orderTree.getChild(0), true); + + final SortDirection sortDir; + if ( orderTree.getChildCount() > 1 ) { + final int sortDirType = orderTree.getChild(1).getType(); + sortDir = (sortDirType == ASC) ? SortDirection.ASC : SortDirection.DESC; + } else { + sortDir = SortDirection.ASC; + } + + accumulators.put(accumulator, sortDir); + } + + return new GroupedSorter(accumulators); + } else { + // TODO: Allow ORDER BY of aggregate values + final Map<OperandEvaluator<?>, SortDirection> evaluators = new LinkedHashMap<>(orderByTree.getChildCount()); + for (int i=0; i < orderByTree.getChildCount(); i++) { + final Tree orderTree = orderByTree.getChild(i); + final OperandEvaluator<?> evaluator = buildOperandEvaluator(orderTree.getChild(0)); + + final SortDirection sortDir; + if ( orderTree.getChildCount() > 1 ) { + final int sortDirType = orderTree.getChild(1).getType(); + sortDir = (sortDirType == ASC) ? SortDirection.ASC : SortDirection.DESC; + } else { + sortDir = SortDirection.ASC; + } + + evaluators.put(evaluator, sortDir); + } + + return new FieldSorter(evaluators); + } + } + + + public static ProvenanceResultSet execute(final String query, final ProvenanceEventRepository repo) throws IOException { + return ProvenanceQuery.compile(query).execute(repo); + } + + + public ProvenanceResultSet evaluate(final Iterator<? extends StoredProvenanceEvent> matchedEvents) { + final List<String> labels = new ArrayList<>(); + final List<Class<?>> returnTypes = new ArrayList<>(selectAccumulators.size()); + + for ( final Accumulator<?> accumulator : selectAccumulators ) { + labels.add(accumulator.getLabel()); + returnTypes.add(accumulator.getReturnType()); + } + + ProvenanceResultSet rs; + if ( isAggregateRequired() ) { + rs = new GroupingResultSet(matchedEvents, + selectAccumulators, sourceEvaluator, conditionEvaluator, + labels, returnTypes, groupEvaluators, sorter, limit); + } else if (sorter == null) { + rs = new StandardUnorderedResultSet(matchedEvents, selectAccumulators, sourceEvaluator, conditionEvaluator, labels, returnTypes, limit); + } else { + rs = new StandardOrderedResultSet(matchedEvents, selectAccumulators, sourceEvaluator, conditionEvaluator, labels, returnTypes, sorter, limit); + } + + return rs; + } + + public ProvenanceResultSet execute(final ProvenanceEventRepository repo) throws IOException { + final RepositoryEvaluator repoEvaluator = new SelectAllRecords(); + + final Iterator<StoredProvenanceEvent> potentialMatches = repoEvaluator.evaluate(repo); + final List<String> labels = new ArrayList<>(); + final List<Class<?>> returnTypes = new ArrayList<>(selectAccumulators.size()); + + for ( final Accumulator<?> accumulator : selectAccumulators ) { + labels.add(accumulator.getLabel()); + returnTypes.add(accumulator.getReturnType()); + } + + ProvenanceResultSet rs; + if ( isAggregateRequired() ) { + rs = new GroupingResultSet(potentialMatches, + selectAccumulators, sourceEvaluator, conditionEvaluator, + labels, returnTypes, groupEvaluators, sorter, limit); + } else if (sorter == null) { + rs = new StandardUnorderedResultSet(potentialMatches, selectAccumulators, sourceEvaluator, conditionEvaluator, labels, returnTypes, limit); + } else { + rs = new StandardOrderedResultSet(potentialMatches, selectAccumulators, sourceEvaluator, conditionEvaluator, labels, returnTypes, sorter, limit); + } + + return rs; + } + + private boolean isAggregateRequired() { + if ( groupEvaluators != null && !groupEvaluators.isEmpty() ) { + return true; + } + + for ( final Accumulator<?> accumulator : selectAccumulators ) { + if ( accumulator.isAggregateFunction() ) { + return true; + } + } + + + return false; + } + + + public RecordEvaluator<Boolean> getWhereClause() { + return conditionEvaluator; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/Accumulator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/Accumulator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/Accumulator.java new file mode 100644 index 0000000..db49091 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/Accumulator.java @@ -0,0 +1,28 @@ +package org.apache.nifi.pql.evaluation; + +import java.util.List; +import java.util.Map; + +import org.apache.nifi.pql.groups.Group; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public interface Accumulator<T> extends Cloneable { + + T accumulate(ProvenanceEventRecord record, Group group); + + String getLabel(); + + boolean isAggregateFunction(); + + Class<? extends T> getReturnType(); + + Accumulator<T> clone(); + + long getId(); + + void reset(); + + Map<Group, List<T>> getValues(); + + List<T> getValues(Group group); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/BooleanEvaluator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/BooleanEvaluator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/BooleanEvaluator.java new file mode 100644 index 0000000..2c1549a --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/BooleanEvaluator.java @@ -0,0 +1,5 @@ +package org.apache.nifi.pql.evaluation; + +public interface BooleanEvaluator extends RecordEvaluator<Boolean> { + BooleanEvaluator negate(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/OperandEvaluator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/OperandEvaluator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/OperandEvaluator.java new file mode 100644 index 0000000..0daace6 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/OperandEvaluator.java @@ -0,0 +1,7 @@ +package org.apache.nifi.pql.evaluation; + +public interface OperandEvaluator<T> extends RecordEvaluator<T> { + + Class<T> getType(); + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/RecordEvaluator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/RecordEvaluator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/RecordEvaluator.java new file mode 100644 index 0000000..98f3b04 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/RecordEvaluator.java @@ -0,0 +1,11 @@ +package org.apache.nifi.pql.evaluation; + +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public interface RecordEvaluator<T> { + + T evaluate(ProvenanceEventRecord record); + + int getEvaluatorType(); + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/RepositoryEvaluator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/RepositoryEvaluator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/RepositoryEvaluator.java new file mode 100644 index 0000000..eb55f75 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/RepositoryEvaluator.java @@ -0,0 +1,13 @@ +package org.apache.nifi.pql.evaluation; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.provenance.StoredProvenanceEvent; + +public interface RepositoryEvaluator { + + Iterator<StoredProvenanceEvent> evaluate(ProvenanceEventRepository repository) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/AverageAccumulator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/AverageAccumulator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/AverageAccumulator.java new file mode 100644 index 0000000..9b04308 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/AverageAccumulator.java @@ -0,0 +1,112 @@ +package org.apache.nifi.pql.evaluation.accumulation; + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.pql.evaluation.Accumulator; +import org.apache.nifi.pql.evaluation.RecordEvaluator; +import org.apache.nifi.pql.groups.Group; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public class AverageAccumulator implements Accumulator<Double> { + + private final long id; + private final RecordEvaluator<Long> evaluator; + private final String label; + private final Map<Group, Values> values = new LinkedHashMap<>(); + + public AverageAccumulator(final long id, final RecordEvaluator<Long> extractor, final String label) { + this.id = id; + this.evaluator = extractor; + this.label = label; + } + + @Override + public Map<Group, List<Double>> getValues() { + final Map<Group, List<Double>> avgs = new HashMap<>(values.size()); + for ( final Map.Entry<Group, Values> entry : values.entrySet() ) { + final Values values = entry.getValue(); + final double avg = (double) values.getSum() / (double) values.getCount(); + avgs.put(entry.getKey(), Collections.singletonList(avg)); + } + return avgs; + } + + @Override + public List<Double> getValues(final Group group) { + final Values v = values.get(group); + if ( v == null ) { + return Collections.emptyList(); + } + + final double d = v.getSum() / v.getCount(); + return Collections.singletonList(d); + } + + public Double accumulate(final ProvenanceEventRecord record, final Group group) { + final Long val = evaluator.evaluate(record); + if ( val != null ) { + Values v = values.get(group); + if ( v == null ) { + v = new Values(); + values.put(group, v); + } + + v.increment(val.longValue()); + return (double) v.getSum() / (double) v.getCount(); + } + + return null; + } + + @Override + public String getLabel() { + return label; + } + + @Override + public boolean isAggregateFunction() { + return true; + } + + @Override + public Class<Double> getReturnType() { + return Double.class; + } + + @Override + public AverageAccumulator clone() { + return new AverageAccumulator(id, evaluator, label); + } + + @Override + public long getId() { + return id; + } + + @Override + public void reset() { + values.clear(); + } + + private static class Values { + private long count; + private long sum; + + public void increment(final long sum) { + count++; + this.sum += sum; + } + + public long getCount() { + return count; + } + + public long getSum() { + return sum; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/CountAccumulator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/CountAccumulator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/CountAccumulator.java new file mode 100644 index 0000000..d4af4bf --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/CountAccumulator.java @@ -0,0 +1,91 @@ +package org.apache.nifi.pql.evaluation.accumulation; + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.pql.evaluation.Accumulator; +import org.apache.nifi.pql.evaluation.RecordEvaluator; +import org.apache.nifi.pql.groups.Group; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public class CountAccumulator implements Accumulator<Long> { + + private final long id; + private final RecordEvaluator<?> evaluator; + private final String label; + private final Map<Group, Long> counts = new LinkedHashMap<>(); + + + public CountAccumulator(final long id, final RecordEvaluator<?> extractor, final String label) { + this.id = id; + this.evaluator = extractor; + this.label = label; + } + + @Override + public Map<Group, List<Long>> getValues() { + final Map<Group, List<Long>> map = new HashMap<>(); + for ( final Map.Entry<Group, Long> entry : counts.entrySet() ) { + map.put(entry.getKey(), Collections.singletonList(entry.getValue())); + } + return map; + } + + @Override + public List<Long> getValues(final Group group) { + final Long count = counts.get(group); + if ( count == null ) { + return Collections.emptyList(); + } + + return Collections.singletonList(count); + } + + @Override + public Long accumulate(final ProvenanceEventRecord record, final Group group) { + final Object value = (evaluator == null) ? record : evaluator.evaluate(record); + if ( value != null ) { + Long val = counts.get(group); + if ( val == null ) { + val = 0L; + } + counts.put(group, val + 1); + return val + 1; + } + + return counts.get(group); + } + + @Override + public String getLabel() { + return label; + } + + @Override + public boolean isAggregateFunction() { + return true; + } + + @Override + public Class<? extends Long> getReturnType() { + return Long.class; + } + + @Override + public long getId() { + return id; + } + + @Override + public void reset() { + counts.clear(); + } + + public CountAccumulator clone() { + return new CountAccumulator(id, evaluator, label); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/EventAccumulator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/EventAccumulator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/EventAccumulator.java new file mode 100644 index 0000000..9d4487c --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/EventAccumulator.java @@ -0,0 +1,98 @@ +package org.apache.nifi.pql.evaluation.accumulation; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.pql.evaluation.Accumulator; +import org.apache.nifi.pql.evaluation.OperandEvaluator; +import org.apache.nifi.pql.groups.Group; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public class EventAccumulator implements Accumulator<Object>, Cloneable { + + private final long id; + private final Map<Group, List<Object>> records = new LinkedHashMap<>(); + private final String label; + private final OperandEvaluator<?> valueExtractor; + private final boolean distinct; + + public EventAccumulator(final long id, final String label, final boolean distinct) { + this(id, label, null, distinct); + } + + public EventAccumulator(final long id, final String label, final OperandEvaluator<?> valueExtractor, final boolean distinct) { + this.id = id; + this.label = label; + this.valueExtractor = valueExtractor; + this.distinct = distinct; + } + + @Override + public Map<Group, List<Object>> getValues() { + return Collections.unmodifiableMap(records); + } + + @Override + public List<Object> getValues(final Group group) { + final List<Object> objects = records.get(group); + if ( objects == null ) { + return Collections.emptyList(); + } + + return Collections.unmodifiableList(objects); + } + + @Override + public Object accumulate(final ProvenanceEventRecord record, final Group group) { + final Object value = valueExtractor == null ? record : valueExtractor.evaluate(record); + + if ( group == null ) { + return value; + } + + List<Object> groupRecords = records.get(group); + if ( groupRecords == null ) { + groupRecords = new ArrayList<>(); + records.put(group, groupRecords); + } + + if ( !distinct || !groupRecords.contains(value) ) { + groupRecords.add( value ); + } + + return Collections.unmodifiableList(groupRecords); + } + + @Override + public String getLabel() { + return label; + } + + @Override + public boolean isAggregateFunction() { + return false; + } + + @Override + public Class<? extends Object> getReturnType() { + return valueExtractor == null ? ProvenanceEventRecord.class : valueExtractor.getType(); + } + + @Override + public EventAccumulator clone() { + return new EventAccumulator(id, label, valueExtractor, distinct); + } + + @Override + public long getId() { + return id; + } + + @Override + public void reset() { + records.clear(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/SumAccumulator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/SumAccumulator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/SumAccumulator.java new file mode 100644 index 0000000..5eacc6e --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/accumulation/SumAccumulator.java @@ -0,0 +1,91 @@ +package org.apache.nifi.pql.evaluation.accumulation; + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.pql.evaluation.Accumulator; +import org.apache.nifi.pql.evaluation.RecordEvaluator; +import org.apache.nifi.pql.groups.Group; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public class SumAccumulator implements Accumulator<Long>, Cloneable { + + private final long id; + private final RecordEvaluator<Long> evaluator; + private final String label; + private final Map<Group, Long> sums = new LinkedHashMap<>(); + + public SumAccumulator(final long id, final RecordEvaluator<Long> extractor, final String label) { + this.id = id; + this.evaluator = extractor; + this.label = label; + } + + @Override + public Map<Group, List<Long>> getValues() { + final Map<Group, List<Long>> map = new HashMap<>(); + for ( final Map.Entry<Group, Long> entry : sums.entrySet() ) { + map.put(entry.getKey(), Collections.singletonList(entry.getValue())); + } + return map; + } + + @Override + public List<Long> getValues(final Group group) { + final Long sum = sums.get(group); + if ( sum == null ) { + return Collections.emptyList(); + } + + return Collections.singletonList(sum); + } + + @Override + public Long accumulate(final ProvenanceEventRecord record, final Group group) { + final Long val = evaluator.evaluate(record); + if ( val != null ) { + Long curVal = sums.get(group); + if ( curVal == null ) { + curVal = 0L; + } + + final long newVal = curVal + val; + sums.put(group, newVal); + return newVal; + } + + return null; + } + + @Override + public String getLabel() { + return label; + } + + @Override + public boolean isAggregateFunction() { + return true; + } + + @Override + public Class<Long> getReturnType() { + return Long.class; + } + + public SumAccumulator clone() { + return new SumAccumulator(id, evaluator, label); + } + + @Override + public long getId() { + return id; + } + + @Override + public void reset() { + sums.clear(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/ConversionUtils.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/ConversionUtils.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/ConversionUtils.java new file mode 100644 index 0000000..75b7a3b --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/ConversionUtils.java @@ -0,0 +1,97 @@ +package org.apache.nifi.pql.evaluation.comparison; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class ConversionUtils { + + public static final String DATE_TO_STRING_FORMAT = "EEE MMM dd HH:mm:ss zzz yyyy"; + public static final Pattern DATE_TO_STRING_PATTERN = Pattern.compile("(?:[a-zA-Z]{3} ){2}\\d{2} \\d{2}\\:\\d{2}\\:\\d{2} (?:.*?) \\d{4}"); + + public static final String ALTERNATE_FORMAT_WITHOUT_MILLIS = "yyyy/MM/dd HH:mm:ss"; + public static final String ALTERNATE_FORMAT_WITH_MILLIS = "yyyy/MM/dd HH:mm:ss.SSS"; + public static final Pattern ALTERNATE_PATTERN = Pattern.compile("\\d{4}/\\d{2}/\\d{2} \\d{2}\\:\\d{2}\\:\\d{2}(\\.\\d{3})?"); + + public static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+"); + + + public static Long convertToLong(final Object o) { + if ( o == null ) { + return null; + } + + if (o instanceof Long) { + return (Long) o; + } + + if (o instanceof Number) { + return ((Number) o).longValue(); + } + + if ( o instanceof Date ) { + return ((Date) o).getTime(); + } + + if ( o instanceof Calendar ) { + return ((Calendar) o).getTimeInMillis(); + } + + if ( o instanceof String ) { + return convertToLong((String) o); + } + + return null; + } + + public static Long convertToLong(final String value) { + if ( value == null ) { + return null; + } + + final String trimmed = value.trim(); + if ( trimmed.isEmpty() ) { + return null; + } + + + if ( DATE_TO_STRING_PATTERN.matcher(trimmed).matches() ) { + final SimpleDateFormat sdf = new SimpleDateFormat(DATE_TO_STRING_FORMAT); + + try { + final Date date = sdf.parse(trimmed); + return date.getTime(); + } catch (final ParseException pe) { + return null; + } + } else if ( NUMBER_PATTERN.matcher(trimmed).matches() ) { + return Long.valueOf(trimmed); + } else { + final Matcher altMatcher = ALTERNATE_PATTERN.matcher(trimmed); + if ( altMatcher.matches() ) { + final String millisValue = altMatcher.group(1); + + final String format; + if ( millisValue == null ) { + format = ALTERNATE_FORMAT_WITHOUT_MILLIS; + } else { + format = ALTERNATE_FORMAT_WITH_MILLIS; + } + + final SimpleDateFormat sdf = new SimpleDateFormat(format); + + try { + return sdf.parse(trimmed).getTime(); + } catch (final ParseException pe) { + return null; + } + } else { + return null; + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/EqualsEvaluator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/EqualsEvaluator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/EqualsEvaluator.java new file mode 100644 index 0000000..70158fb --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/EqualsEvaluator.java @@ -0,0 +1,74 @@ +package org.apache.nifi.pql.evaluation.comparison; + +import org.apache.nifi.pql.evaluation.BooleanEvaluator; +import org.apache.nifi.pql.evaluation.OperandEvaluator; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; + +public class EqualsEvaluator implements BooleanEvaluator { + + private final OperandEvaluator<?> lhs; + private final OperandEvaluator<?> rhs; + private final boolean negated; + private final String alias; + + public EqualsEvaluator(final OperandEvaluator<?> lhs, final OperandEvaluator<?> rhs) { + this(lhs, rhs, false, null); + } + + public EqualsEvaluator(final OperandEvaluator<?> lhs, final OperandEvaluator<?> rhs, final String alias) { + this(lhs, rhs, false, alias); + } + + public EqualsEvaluator(final OperandEvaluator<?> lhs, final OperandEvaluator<?> rhs, final boolean negated) { + this(lhs, rhs, negated, null); + } + + public EqualsEvaluator(final OperandEvaluator<?> lhs, final OperandEvaluator<?> rhs, final boolean negated, final String alias) { + this.lhs = lhs; + this.rhs = rhs; + this.negated = negated; + this.alias = alias; + } + + public OperandEvaluator<?> getLHS() { + return lhs; + } + + public OperandEvaluator<?> getRHS() { + return rhs; + } + + public Boolean evaluate(final ProvenanceEventRecord record) { + Object lhsValue = lhs.evaluate(record); + Object rhsValue = rhs.evaluate(record); + + if ( lhsValue == null || rhsValue == null ) { + return false; + } + + if ( lhsValue instanceof ProvenanceEventType ) { + lhsValue = ((ProvenanceEventType) lhsValue).name(); + } + if ( rhsValue instanceof ProvenanceEventType ) { + rhsValue = ((ProvenanceEventType) rhsValue).name(); + } + + final boolean equal = lhsValue.equals(rhsValue); + return negated ? !equal : equal; + } + + public BooleanEvaluator negate() { + return new EqualsEvaluator(lhs, rhs, !negated, alias); + } + + @Override + public String toString() { + return alias == null ? lhs.toString() + "=" + rhs.toString() : alias; + } + + @Override + public int getEvaluatorType() { + return org.apache.nifi.pql.ProvenanceQueryParser.EQUALS; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/GreaterThanEvaluator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/GreaterThanEvaluator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/GreaterThanEvaluator.java new file mode 100644 index 0000000..327fc9f --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/GreaterThanEvaluator.java @@ -0,0 +1,51 @@ +package org.apache.nifi.pql.evaluation.comparison; + +import org.apache.nifi.pql.evaluation.BooleanEvaluator; +import org.apache.nifi.pql.evaluation.OperandEvaluator; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public class GreaterThanEvaluator implements BooleanEvaluator { + private final OperandEvaluator<?> lhs; + private final OperandEvaluator<?> rhs; + private final boolean negated; + + public GreaterThanEvaluator(final OperandEvaluator<?> lhs, final OperandEvaluator<?> rhs) { + this(lhs, rhs, false); + } + + public GreaterThanEvaluator(final OperandEvaluator<?> lhs, final OperandEvaluator<?> rhs, final boolean negate) { + this.lhs = lhs; + this.rhs = rhs; + this.negated = negate; + } + + public OperandEvaluator<?> getLHS() { + return lhs; + } + + public OperandEvaluator<?> getRHS() { + return rhs; + } + + public Boolean evaluate(final ProvenanceEventRecord record) { + final Long lhsValue = ConversionUtils.convertToLong(lhs.evaluate(record)); + final Long rhsValue = ConversionUtils.convertToLong(rhs.evaluate(record)); + + if ( lhsValue == null || rhsValue == null ) { + return false; + } + + final boolean greaterThan = lhsValue.longValue() > rhsValue.longValue(); + return negated ? !greaterThan : greaterThan; + } + + public BooleanEvaluator negate() { + return new GreaterThanEvaluator(lhs, rhs, !negated); + } + + @Override + public int getEvaluatorType() { + return org.apache.nifi.pql.ProvenanceQueryParser.GT; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b512ff12/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/LessThanEvaluator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/LessThanEvaluator.java b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/LessThanEvaluator.java new file mode 100644 index 0000000..5940509 --- /dev/null +++ b/nifi/nifi-commons/nifi-provenance-query-language/src/main/java/org/apache/nifi/pql/evaluation/comparison/LessThanEvaluator.java @@ -0,0 +1,51 @@ +package org.apache.nifi.pql.evaluation.comparison; + +import org.apache.nifi.pql.evaluation.BooleanEvaluator; +import org.apache.nifi.pql.evaluation.OperandEvaluator; +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public class LessThanEvaluator implements BooleanEvaluator { + private final OperandEvaluator<?> lhs; + private final OperandEvaluator<?> rhs; + private final boolean negated; + + public LessThanEvaluator(final OperandEvaluator<?> lhs, final OperandEvaluator<?> rhs) { + this(lhs, rhs, false); + } + + public LessThanEvaluator(final OperandEvaluator<?> lhs, final OperandEvaluator<?> rhs, final boolean negate) { + this.lhs = lhs; + this.rhs = rhs; + this.negated = negate; + } + + public OperandEvaluator<?> getLHS() { + return lhs; + } + + public OperandEvaluator<?> getRHS() { + return rhs; + } + + public Boolean evaluate(final ProvenanceEventRecord record) { + final Long lhsValue = ConversionUtils.convertToLong(lhs.evaluate(record)); + final Long rhsValue = ConversionUtils.convertToLong(rhs.evaluate(record)); + + if ( lhsValue == null || rhsValue == null ) { + return false; + } + + final boolean lessThan = lhsValue.longValue() < rhsValue.longValue(); + return negated ? !lessThan : lessThan; + } + + public BooleanEvaluator negate() { + return new GreaterThanEvaluator(lhs, rhs, !negated); + } + + @Override + public int getEvaluatorType() { + return org.apache.nifi.pql.ProvenanceQueryParser.LT; + } + +}
