VXQUERY-207 VXQUERY-209 Parallel Index creation and access, and index centralization
Fixed issue in Tests where no result file meant success Project: http://git-wip-us.apache.org/repos/asf/vxquery/repo Commit: http://git-wip-us.apache.org/repos/asf/vxquery/commit/eb76640f Tree: http://git-wip-us.apache.org/repos/asf/vxquery/tree/eb76640f Diff: http://git-wip-us.apache.org/repos/asf/vxquery/diff/eb76640f Branch: refs/heads/master Commit: eb76640fc61a99b5c4bddab19afa45fe10cf0187 Parents: 6bee358 Author: Steven Glenn Jacobs <[email protected]> Authored: Wed Sep 7 15:44:47 2016 -0700 Committer: Steven Glenn Jacobs <[email protected]> Committed: Wed Sep 7 16:09:07 2016 -0700 ---------------------------------------------------------------------- src/site/apt/user_indexing.apt | 202 ++++++++++++ .../java/org/apache/vxquery/cli/VXQuery.java | 1 + .../apache/vxquery/common/VXQueryCommons.java | 43 +++ .../compiler/rewriter/RewriteRuleset.java | 2 + .../rewriter/rules/AbstractCollectionRule.java | 31 +- .../rewriter/rules/IntroduceCollectionRule.java | 25 +- .../rewriter/rules/IntroduceIndexingRule.java | 58 ++++ .../rules/PushChildIntoDataScanRule.java | 11 +- .../vxquery/functions/builtin-functions.xml | 27 +- .../metadata/AbstractVXQueryDataSource.java | 39 +++ .../metadata/VXQueryCollectionDataSource.java | 18 +- .../metadata/VXQueryIndexingDataSource.java | 137 ++++++++ .../VXQueryIndexingOperatorDescriptor.java | 218 +++++++++++++ .../metadata/VXQueryMetadataProvider.java | 38 ++- ...ctionFromIndexUnnestingEvaluatorFactory.java | 327 ------------------- .../IndexConstructorScalarEvaluatorFactory.java | 79 ----- .../functions/index/IndexConstructorUtil.java | 62 +--- .../index/IndexDeleteEvaluatorFactory.java | 83 ----- .../index/IndexUpdaterEvaluatorFactory.java | 84 ----- .../index/ShowIndexScalarEvaluatorFactory.java | 65 ++++ .../functions/index/VXQueryIndexReader.java | 284 ++++++++++++++++ .../indexCentralizer/IndexCentralizerUtil.java | 164 ++++++++++ .../index/indexCentralizer/IndexDirectory.java | 42 +++ .../index/indexCentralizer/IndexLocator.java | 50 +++ .../functions/index/updateIndex/Constants.java | 1 - .../index/updateIndex/IndexUpdater.java | 72 ++-- .../index/updateIndex/MetaFileUtil.java | 149 ++++----- .../updateIndex/XmlMetadataCollection.java | 6 +- .../vxquery/indexing/MetaFileUtilTest.java | 36 +- .../apache/vxquery/indexing/TestConstants.java | 8 +- .../src/main/resources/conf/cluster_example.xml | 1 + .../src/main/resources/conf/local.xml | 1 + .../main/resources/scripts/cluster_actions.py | 2 +- .../resources/scripts/cluster_information.py | 3 + .../src/main/resources/scripts/startnc.sh | 4 + .../apache/vxquery/xtest/TestClusterUtil.java | 12 +- .../vxquery/xtest/AbstractXQueryTest.java | 1 + .../Indexing/Partition-1/createIndex.txt | 1 + .../Indexing/Partition-1/deleteIndex.txt | 1 + .../Indexing/Partition-1/showIndex1.txt | 1 + .../Indexing/Partition-1/showIndex2.txt | 1 + .../Indexing/Partition-1/updateIndex.txt | 1 + .../Indexing/Partition-1/useIndex1.txt | 2 + .../Indexing/Partition-1/useIndex2.txt | 1 + .../Indexing/Partition-1/useIndex3.txt | 1 + .../Indexing/Partition-1/useIndex4.txt | 1 + .../Indexing/Partition-1/useIndex5.txt | 3 + .../Indexing/Partition-1/useIndex6.txt | 2 + .../Indexing/Partition-1/useIndex7.txt | 3 + .../Indexing/Partition-2/createIndex.txt | 2 + .../Indexing/Partition-2/showIndex1.txt | 2 + .../Indexing/Partition-2/updateIndex.txt | 2 + .../Indexing/Partition-2/useIndex1.txt | 2 + .../Indexing/Partition-2/useIndex2.txt | 1 + .../Indexing/Partition-2/useIndex3.txt | 1 + .../Indexing/Partition-2/useIndex4.txt | 1 + .../Indexing/Partition-2/useIndex5.txt | 3 + .../Indexing/Partition-2/useIndex6.txt | 2 + .../Indexing/Partition-2/useIndex7.txt | 3 + .../Indexing/Partition-4/createIndex.txt | 4 + .../Indexing/Partition-4/showIndex1.txt | 6 + .../Indexing/Partition-4/updateIndex.txt | 4 + .../Indexing/Partition-4/useIndex1.txt | 2 + .../Indexing/Partition-4/useIndex2.txt | 1 + .../Indexing/Partition-4/useIndex3.txt | 1 + .../Indexing/Partition-4/useIndex4.txt | 1 + .../Indexing/Partition-4/useIndex5.txt | 3 + .../Indexing/Partition-4/useIndex6.txt | 2 + .../Indexing/Partition-4/useIndex7.txt | 3 + .../Indexing/createIndex.txt | 1 - .../Indexing/deleteIndex.txt | 1 - .../Indexing/updateIndex.txt | 1 - .../ExpectedTestResults/Indexing/useIndex1.txt | 2 - .../ExpectedTestResults/Indexing/useIndex2.txt | 1 - .../ExpectedTestResults/Indexing/useIndex3.txt | 1 - .../ExpectedTestResults/Indexing/useIndex4.txt | 1 - .../ExpectedTestResults/Indexing/useIndex5.txt | 3 - .../ExpectedTestResults/Indexing/useIndex6.txt | 2 - .../ExpectedTestResults/Indexing/useIndex7.txt | 3 - .../Json/Libraries/project.txt | 13 + .../Json/Libraries/remove_keys.txt | 12 + .../XQuery/Indexing/Partition-1/createIndex.xq | 20 ++ .../XQuery/Indexing/Partition-1/deleteIndex.xq | 19 ++ .../XQuery/Indexing/Partition-1/showIndex.xq | 19 ++ .../XQuery/Indexing/Partition-1/updateIndex.xq | 19 ++ .../XQuery/Indexing/Partition-1/useIndex1.xq | 25 ++ .../XQuery/Indexing/Partition-1/useIndex2.xq | 24 ++ .../XQuery/Indexing/Partition-1/useIndex3.xq | 27 ++ .../XQuery/Indexing/Partition-1/useIndex4.xq | 24 ++ .../XQuery/Indexing/Partition-1/useIndex5.xq | 23 ++ .../XQuery/Indexing/Partition-1/useIndex6.xq | 23 ++ .../XQuery/Indexing/Partition-1/useIndex7.xq | 27 ++ .../XQuery/Indexing/Partition-2/createIndex.xq | 20 ++ .../XQuery/Indexing/Partition-2/showIndex.xq | 19 ++ .../XQuery/Indexing/Partition-2/updateIndex.xq | 19 ++ .../XQuery/Indexing/Partition-2/useIndex1.xq | 25 ++ .../XQuery/Indexing/Partition-2/useIndex2.xq | 24 ++ .../XQuery/Indexing/Partition-2/useIndex3.xq | 27 ++ .../XQuery/Indexing/Partition-2/useIndex4.xq | 24 ++ .../XQuery/Indexing/Partition-2/useIndex5.xq | 23 ++ .../XQuery/Indexing/Partition-2/useIndex6.xq | 23 ++ .../XQuery/Indexing/Partition-2/useIndex7.xq | 27 ++ .../XQuery/Indexing/Partition-4/createIndex.xq | 20 ++ .../XQuery/Indexing/Partition-4/showIndex.xq | 19 ++ .../XQuery/Indexing/Partition-4/updateIndex.xq | 19 ++ .../XQuery/Indexing/Partition-4/useIndex1.xq | 25 ++ .../XQuery/Indexing/Partition-4/useIndex2.xq | 24 ++ .../XQuery/Indexing/Partition-4/useIndex3.xq | 27 ++ .../XQuery/Indexing/Partition-4/useIndex4.xq | 24 ++ .../XQuery/Indexing/Partition-4/useIndex5.xq | 23 ++ .../XQuery/Indexing/Partition-4/useIndex6.xq | 23 ++ .../XQuery/Indexing/Partition-4/useIndex7.xq | 27 ++ .../Queries/XQuery/Indexing/createIndex.xq | 20 -- .../Queries/XQuery/Indexing/deleteIndex.xq | 19 -- .../Queries/XQuery/Indexing/updateIndex.xq | 19 -- .../Queries/XQuery/Indexing/useIndex1.xq | 25 -- .../Queries/XQuery/Indexing/useIndex2.xq | 24 -- .../Queries/XQuery/Indexing/useIndex3.xq | 27 -- .../Queries/XQuery/Indexing/useIndex4.xq | 24 -- .../Queries/XQuery/Indexing/useIndex5.xq | 23 -- .../Queries/XQuery/Indexing/useIndex6.xq | 23 -- .../Queries/XQuery/Indexing/useIndex7.xq | 27 -- .../src/test/resources/cat/IndexingQueries.xml | 132 +++++++- 123 files changed, 2472 insertions(+), 1094 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/vxquery/blob/eb76640f/src/site/apt/user_indexing.apt ---------------------------------------------------------------------- diff --git a/src/site/apt/user_indexing.apt b/src/site/apt/user_indexing.apt new file mode 100644 index 0000000..b572231 --- /dev/null +++ b/src/site/apt/user_indexing.apt @@ -0,0 +1,202 @@ +~~ Licensed to the Apache Software Foundation (ASF) under one or more +~~ contributor license agreements. See the NOTICE file distributed with +~~ this work for additional information regarding copyright ownership. +~~ The ASF licenses this file to You under the Apache License, Version 2.0 +~~ (the "License"); you may not use this file except in compliance with +~~ the License. You may obtain a copy of the License at +~~ +~~ http://www.apache.org/licenses/LICENSE-2.0 +~~ +~~ Unless required by applicable law or agreed to in writing, software +~~ distributed under the License is distributed on an "AS IS" BASIS, +~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +~~ See the License for the specific language governing permissions and +~~ limitations under the License. + +How to use Indexing Features in VXQuery. + +In VXQuery, all the indexes are created in user specified directory. In order to use indexing, +you will need to set this directory in your cluster configuration file. + +*** Configuring VXQuery to use indexing functions. +Add the following line to your cluster configuration (e.g. cluster.xml) + +-------- + <index_directory><path_to_index_directory></index_directory> +-------- + +(You should create this index_directory) + +** Using indexing queries. + +VXQuery offers following indexing functionality. + +[[a]] Create an index for collection. +[[b]] Use the index in executing a query. +[[c]] Update the index. +[[d]] Delete the index. +[[e]] View existing indexes. + +*1. Scenario I - When collection is a single directory. +In this scenario, all the XML files are stored in a single directory. (There can be sub directories) + +*** Creating an index for collection +If I need to create index for xml collection stored in <path_1>/collection1, + +Query structure: +-------- +build-index-on-collection("collection") +-------- + +You can see the index has created in a new sub-directory in the index_directory specified in local.xml + +Example: +-------- +build-index-on-collection("<path_1>/collection1") +-------- +This function takes the collection path as an argument. + +*** Using index in query. +If we need to use the index and execute a query, use the following structure. + +------ +for $r in collection-from-index("<path1>/collection1", "/dataCollection/data")/data +where $r/dataType eq "AWND" and xs:decimal($r/value) gt 491.744 +return $r +------ +Here the index access function is, + +------ +collection-from-index +------ + +which takes two arguments, collection folder and the path element. + +Result + +------ +<data><date>2001-01-01T00:00:00.000</date><dataType>AWND</dataType><station>GHCND:US000000001</station><value>1000</value><attributes><attribute/><attribute/><attribute>a</attribute></attributes></data> +------ + +*** Updating the index. +A collection can be modified or changed by following ways. +[[1]] Inserting new XML files. +[[2]] Deleting files. +[[3]] Add/ remove or modify the content of XML files. + +In this type of situation, the index corresponding to the modified collection must also be modified. To achieve this +the update-index function can be used. + +Query structure: +-------- +update-index("<path_to_collection>") +-------- + +Example: +------- +update-index("<path_1>/collection1") +------- + +This function takes the collection which was modified. + +*** Deleting the index. +If we want to delete the entire index created for a collection, the delete-index function can be used. +This function also takes the collection path of which the index is needed to be deleted. + +Query structure: +-------- +delete-index("<path_to_collection>") +-------- + +Example: +------- +delete-index("<path_11>/collection1") +------- + +*2. Scenario II - When the collection is distributed. +In this scenario, the collection is distributed among several directories. We can distribute the queries among +partitions. + +*** Creating indexes for collections. + +Query structure: +-------- +build-index-on-collection("<partition_1_path>|<partition_2_path>|<partition_3_path>|...|<partition_n_path>") +-------- + +In here the parameter contains the list of collection partitions separated by '|' character. + +Example: +Consider the collection has now distributed among four directories, <path_1>/collection1, <path_2>/collection2, +<path_3>/collection3 and <path_4>/collection4. + +To create indexes for all of the above collections, +------- +build-index-on-collection("<path_1>/collection1|<path_2>/collection2|<path_3>/collection3|<path_4>/collection4") +------- + +In this case, all indexes will be created in separate sub-directories corresponding to each partition. Also note that + this query requires each node to have four partitions available + +*** Using the indexes in query. +In this case, suppose you need to run a query on indexes of two collection partitions. + +Example: +----- +for $r in collection-from-index("<path_1>/collection1|<path-2>collection2", "/dataCollection/data")/data +where $r/dataType eq "AWND" and xs:decimal($r/value) gt 491.744 +return $r +----- + +The result will be taken from the indexes of both collection1 and collection2. + +Result: +------ +<data><date>2001-01-01T00:00:00.000</date><dataType>AWND</dataType><station>GHCND:US000000001</station><value>1000</value><attributes><attribute/><attribute/><attribute>a</attribute></attributes></data> +------ + +*** Updating the indexes. +In cases of updating the collection files stored in several partitions, we can use this function to update the +indexes of those directories. + +In this case, give a '|' separated list of collection directories. +Query structure: +-------- +update-index("<partition_1_path>|<partition_2_path>|<partition_3_path>|...|<partition_n_path>") +-------- + +Example: +Suppose that we need to update the indexes in partition1 and partition4 +-------- +update-index("<path_1>/collection1|<path_4>/collection4") +-------- + +*** Deleting the indexes. +If we want to delete indexes of collections in several partitions, we can use this function. +Query structure: +-------- +delete-index("<partition_1_path>|<partition_2_path>|<partition_3_path>|...|<partition_n_path>") +-------- + +Example: +Suppose that we need to update the indexes in collection2 and collection3 +-------- +delete-index("<path_2>/collection2|<path_3>/collection3") +-------- + +** Viewing Index information. +Suppose you need to check, what are the collections have indexes created. To do this, the show-index function can be +used. This function takes no arguments and returns a sequence of collection paths, which an index is already created. +If there are no indexes created for any collection, the result will be null. + +Suppose we have two collections, <path_1/collection1>, <path_2/collection2> have indexes created. +Example: +------ +show-index() +------ + +Result: +------ +<path_1/collection1> +<path_2/collection2> +------ http://git-wip-us.apache.org/repos/asf/vxquery/blob/eb76640f/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java ---------------------------------------------------------------------- diff --git a/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java b/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java index e18332e..e0e3843 100644 --- a/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java +++ b/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java @@ -305,6 +305,7 @@ public class VXQuery { ncConfig.dataIPAddress = localAddress; ncConfig.resultIPAddress = localAddress; ncConfig.nodeId = "nc" + (i + 1); + //TODO: enable index folder as a cli option for on-the-fly indexing queries ncConfig.ioDevices = Files.createTempDirectory(ncConfig.nodeId).toString(); ncs[i] = new NodeControllerService(ncConfig); ncs[i].start(); http://git-wip-us.apache.org/repos/asf/vxquery/blob/eb76640f/vxquery-core/src/main/java/org/apache/vxquery/common/VXQueryCommons.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/common/VXQueryCommons.java b/vxquery-core/src/main/java/org/apache/vxquery/common/VXQueryCommons.java new file mode 100644 index 0000000..ceaf3c7 --- /dev/null +++ b/vxquery-core/src/main/java/org/apache/vxquery/common/VXQueryCommons.java @@ -0,0 +1,43 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.vxquery.common; + +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.vxquery.functions.BuiltinFunctions; + +import java.util.HashSet; +import java.util.Set; + +public class VXQueryCommons { + + public static final Set<FunctionIdentifier> collectionFunctions = new HashSet<>(); + + static { + collectionFunctions.add(BuiltinFunctions.FN_COLLECTION_1.getFunctionIdentifier()); + collectionFunctions.add(BuiltinFunctions.FN_COLLECTION_WITH_TAG_2.getFunctionIdentifier()); + } + + public static final Set<FunctionIdentifier> indexingFunctions = new HashSet<>(); + + static { + indexingFunctions.add(BuiltinFunctions.FN_BUILD_INDEX_ON_COLLECTION_1.getFunctionIdentifier()); + indexingFunctions.add(BuiltinFunctions.FN_COLLECTION_FROM_INDEX_2.getFunctionIdentifier()); + indexingFunctions.add(BuiltinFunctions.FN_DELETE_INDEX_1.getFunctionIdentifier()); + indexingFunctions.add(BuiltinFunctions.FN_UPDATE_INDEX_1.getFunctionIdentifier()); + } + +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/eb76640f/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java index 205e0b2..d5909c6 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java @@ -28,6 +28,7 @@ import org.apache.vxquery.compiler.rewriter.rules.EliminateSubplanForSingleItems import org.apache.vxquery.compiler.rewriter.rules.EliminateUnnestAggregateSequencesRule; import org.apache.vxquery.compiler.rewriter.rules.EliminateUnnestAggregateSubplanRule; import org.apache.vxquery.compiler.rewriter.rules.IntroduceCollectionRule; +import org.apache.vxquery.compiler.rewriter.rules.IntroduceIndexingRule; import org.apache.vxquery.compiler.rewriter.rules.IntroduceTwoStepAggregateRule; import org.apache.vxquery.compiler.rewriter.rules.PushChildIntoDataScanRule; import org.apache.vxquery.compiler.rewriter.rules.PushFunctionsOntoEqJoinBranches; @@ -130,6 +131,7 @@ public class RewriteRuleset { normalization.add(new SetCollectionDataSourceRule()); normalization.add(new IntroduceCollectionRule()); normalization.add(new RemoveUnusedAssignAndAggregateRule()); + normalization.add(new IntroduceIndexingRule()); normalization.add(new ConsolidateDescandantChild()); http://git-wip-us.apache.org/repos/asf/vxquery/blob/eb76640f/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java index 74220da..eff7a6e 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java @@ -20,6 +20,7 @@ import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Set; import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; @@ -30,8 +31,11 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression; import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; import org.apache.hyracks.data.std.primitive.UTF8StringPointable; @@ -40,7 +44,6 @@ import org.apache.vxquery.compiler.algebricks.VXQueryConstantValue; import org.apache.vxquery.compiler.rewriter.rules.util.OperatorToolbox; import org.apache.vxquery.datamodel.accessors.TaggedValuePointable; import org.apache.vxquery.datamodel.values.ValueTag; -import org.apache.vxquery.functions.BuiltinFunctions; import org.apache.vxquery.types.BuiltinTypeRegistry; import org.apache.vxquery.types.Quantifier; import org.apache.vxquery.types.SequenceType; @@ -50,6 +53,7 @@ public abstract class AbstractCollectionRule implements IAlgebraicRewriteRule { final DataInputStream di = new DataInputStream(bbis); final UTF8StringPointable stringp = (UTF8StringPointable) UTF8StringPointable.FACTORY.createPointable(); final TaggedValuePointable tvp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable(); + public static AbstractFunctionCallExpression functionCall; /** * Get the arguments for the collection and collection-with-tag. Return null for not a collection. @@ -58,8 +62,7 @@ public abstract class AbstractCollectionRule implements IAlgebraicRewriteRule { * Logical operator * @return collection name */ - protected String[] getCollectionName(Mutable<ILogicalOperator> opRef) { - + protected String[] getFunctionalArguments(Mutable<ILogicalOperator> opRef, Set<FunctionIdentifier> functions) { AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue(); if (op.getOperatorTag() != LogicalOperatorTag.UNNEST) { return null; @@ -78,11 +81,9 @@ public abstract class AbstractCollectionRule implements IAlgebraicRewriteRule { if (logicalExpression.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) { return null; } - AbstractFunctionCallExpression functionCall = (AbstractFunctionCallExpression) logicalExpression; - if (!functionCall.getFunctionIdentifier() - .equals(BuiltinFunctions.FN_COLLECTION_WITH_TAG_2.getFunctionIdentifier()) - && !functionCall.getFunctionIdentifier() - .equals(BuiltinFunctions.FN_COLLECTION_1.getFunctionIdentifier())) { + functionCall = (AbstractFunctionCallExpression) logicalExpression; + + if (!functions.contains(functionCall.getFunctionIdentifier())) { return null; } @@ -144,6 +145,20 @@ public abstract class AbstractCollectionRule implements IAlgebraicRewriteRule { return null; } + protected boolean setDataSourceScan(IDataSource<String> ids, Mutable<ILogicalOperator> opRef) { + AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue(); + UnnestOperator unnest = (UnnestOperator) op; + Mutable<ILogicalOperator> opRef2 = unnest.getInputs().get(0); + AbstractLogicalOperator op2 = (AbstractLogicalOperator) opRef2.getValue(); + AssignOperator assign = (AssignOperator) op2; + + DataSourceScanOperator opNew = new DataSourceScanOperator(assign.getVariables(), ids); + opNew.getInputs().addAll(assign.getInputs()); + opRef2.setValue(opNew); + + return true; + } + @Override public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) { return false; http://git-wip-us.apache.org/repos/asf/vxquery/blob/eb76640f/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java index 8ed8bb1..42d59aa 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java @@ -17,20 +17,15 @@ package org.apache.vxquery.compiler.rewriter.rules; import org.apache.commons.lang3.mutable.Mutable; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.vxquery.common.VXQueryCommons; import org.apache.vxquery.compiler.rewriter.VXQueryOptimizationContext; import org.apache.vxquery.metadata.VXQueryCollectionDataSource; import org.apache.vxquery.types.AnyItemType; import org.apache.vxquery.types.Quantifier; import org.apache.vxquery.types.SequenceType; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; -import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; - /** * Find the default query plan created for collection and updated it to use * parallelization. The rule searches for unnest followed by an assign for the @@ -61,10 +56,11 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperat * @author prestonc */ public class IntroduceCollectionRule extends AbstractCollectionRule { + @Override public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) { VXQueryOptimizationContext vxqueryContext = (VXQueryOptimizationContext) context; - String args[] = getCollectionName(opRef); + String args[] = getFunctionalArguments(opRef, VXQueryCommons.collectionFunctions); if (args != null) { String collectionName = args[0]; @@ -82,16 +78,7 @@ public class IntroduceCollectionRule extends AbstractCollectionRule { } // Known to be true because of collection name. - AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue(); - UnnestOperator unnest = (UnnestOperator) op; - Mutable<ILogicalOperator> opRef2 = unnest.getInputs().get(0); - AbstractLogicalOperator op2 = (AbstractLogicalOperator) opRef2.getValue(); - AssignOperator assign = (AssignOperator) op2; - - DataSourceScanOperator opNew = new DataSourceScanOperator(assign.getVariables(), ds); - opNew.getInputs().addAll(assign.getInputs()); - opRef2.setValue(opNew); - return true; + return setDataSourceScan(ds, opRef); } } return false; http://git-wip-us.apache.org/repos/asf/vxquery/blob/eb76640f/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceIndexingRule.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceIndexingRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceIndexingRule.java new file mode 100644 index 0000000..5b96131 --- /dev/null +++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceIndexingRule.java @@ -0,0 +1,58 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.vxquery.compiler.rewriter.rules; + +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.vxquery.common.VXQueryCommons; +import org.apache.vxquery.compiler.rewriter.VXQueryOptimizationContext; +import org.apache.vxquery.metadata.VXQueryIndexingDataSource; +import org.apache.vxquery.types.AnyItemType; +import org.apache.vxquery.types.Quantifier; +import org.apache.vxquery.types.SequenceType; + +/** + * + */ +public class IntroduceIndexingRule extends AbstractCollectionRule { + + @Override + public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException { + VXQueryOptimizationContext vxqueryContext = (VXQueryOptimizationContext) context; + String args[] = getFunctionalArguments(opRef, VXQueryCommons.indexingFunctions); + + if (args != null) { + + String collection = args[0]; + String elementPath = args.length > 1?args[1]:null; + + // Build the new operator and update the query plan. + int collectionId = vxqueryContext.newCollectionId(); + VXQueryIndexingDataSource ids = VXQueryIndexingDataSource.create(collectionId, collection, elementPath, + SequenceType.create(AnyItemType.INSTANCE, Quantifier.QUANT_STAR), functionCall.getFunctionIdentifier().getName()); + if (ids != null) { + ids.setTotalDataSources(vxqueryContext.getTotalDataSources()); + + // Known to be true because of collection name. + return setDataSourceScan(ids, opRef); + } + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/eb76640f/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushChildIntoDataScanRule.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushChildIntoDataScanRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushChildIntoDataScanRule.java index 7ffcd90..6060c19 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushChildIntoDataScanRule.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushChildIntoDataScanRule.java @@ -34,6 +34,7 @@ import org.apache.vxquery.compiler.rewriter.rules.util.ExpressionToolbox; import org.apache.vxquery.context.StaticContext; import org.apache.vxquery.functions.BuiltinOperators; import org.apache.vxquery.metadata.VXQueryCollectionDataSource; +import org.apache.vxquery.metadata.VXQueryIndexingDataSource; import org.apache.vxquery.metadata.VXQueryMetadataProvider; import org.apache.vxquery.types.ElementType; @@ -86,8 +87,16 @@ public class PushChildIntoDataScanRule extends AbstractUsedVariablesProcessingRu DataSourceScanOperator datascan = (DataSourceScanOperator) op2; if (!usedVariables.contains(datascan.getVariables())) { + VXQueryCollectionDataSource ds = null; + VXQueryIndexingDataSource ids = null; + // Find all child functions. - VXQueryCollectionDataSource ds = (VXQueryCollectionDataSource) datascan.getDataSource(); + try { + ids = (VXQueryIndexingDataSource) datascan.getDataSource(); + } catch (ClassCastException e) { + ds = (VXQueryCollectionDataSource) datascan.getDataSource(); + } + if (!updateDataSource(ds, unnest.getExpressionRef())) { return false; } http://git-wip-us.apache.org/repos/asf/vxquery/blob/eb76640f/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml index 10ca007..688e86e 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml +++ b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml @@ -128,28 +128,25 @@ <!-- Collection operator is added during the rewrite rules phase. --> </function> - <!-- fn:build-index-on-collection($arg as xs:string?, $indexFolder as xs:string?) as node()* --> + <!-- fn:build-index-on-collection($arg as xs:string?, $indexFolder as xs:string?) as boolean --> <function name="fn:build-index-on-collection"> <param name="collection-folder" type="xs:string?"/> - <param name="index-folder" type="xs:string?"/> <return type="xs:boolean"/> - <runtime type="scalar" class="org.apache.vxquery.runtime.functions.index.IndexConstructorScalarEvaluatorFactory"/> + <!-- build-index-on-collection operator is added during the rewrite rules phase. --> </function> - <!-- fn:update-index($indexFolder as xs:string?) as node()* --> + <!-- fn:update-index($indexFolder as xs:string?) as boolean --> <function name="fn:update-index"> - <param name="index-folder" type="xs:string?"/> + <param name="collection-folder" type="xs:string?"/> <return type="xs:boolean"/> - <runtime type="scalar" - class="org.apache.vxquery.runtime.functions.index.IndexUpdaterEvaluatorFactory"/> + <!-- update-index operator is added during the rewrite rules phase. --> </function> - <!-- fn:delete-index($indexFolder as xs:string?) as node()* --> + <!-- fn:delete-index($indexFolder as xs:string?) as boolean --> <function name="fn:delete-index"> - <param name="index-folder" type="xs:string?"/> + <param name="collection-folder" type="xs:string?"/> <return type="xs:boolean"/> - <runtime type="scalar" - class="org.apache.vxquery.runtime.functions.index.IndexDeleteEvaluatorFactory"/> + <!-- delete-index operator is added during the rewrite rules phase. --> </function> <!-- fn:collection-from-index($indexfolder as xs:string?, $elementpath as xs:string?) as node()* --> @@ -157,13 +154,19 @@ <param name="index-folder" type="xs:string?"/> <param name="element-path" type="xs:string?"/> <return type="node()*"/> - <runtime type="unnesting" class="org.apache.vxquery.runtime.functions.index.CollectionFromIndexUnnestingEvaluatorFactory"/> <property type="DocumentOrder" class="org.apache.vxquery.compiler.rewriter.rules.propagationpolicies.InputPropertyPropagationPolicy"> <argument value="0"/> </property> <property type="UniqueNodes" class="org.apache.vxquery.compiler.rewriter.rules.propagationpolicies.InputPropertyPropagationPolicy"> <argument value="0"/> </property> + <!-- collection-from-index operator is added during the rewrite rules phase. --> + </function> + + <!-- fn:show-index as node()* --> + <function name="fn:show-index"> + <return type="node()*"/> + <runtime type="scalar" class="org.apache.vxquery.runtime.functions.index.ShowIndexScalarEvaluatorFactory"/> </function> <!-- fn:collection-with-tag($arg1 as xs:string?, $arg2 as xs:string?) as node()* --> http://git-wip-us.apache.org/repos/asf/vxquery/blob/eb76640f/vxquery-core/src/main/java/org/apache/vxquery/metadata/AbstractVXQueryDataSource.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/AbstractVXQueryDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/AbstractVXQueryDataSource.java new file mode 100644 index 0000000..dd9898c --- /dev/null +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/AbstractVXQueryDataSource.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.vxquery.metadata; + +import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider; + +import java.util.List; + +public abstract class AbstractVXQueryDataSource { + protected static final String DELIMITER = "\\|"; + protected int dataSourceId; + protected String collectionName; + protected String[] collectionPartitions; + protected String elementPath; + protected List<Integer> childSeq; + protected int totalDataSources; + protected String tag; + protected String function; + + protected Object[] types; + + protected IDataSourcePropertiesProvider propProvider; + + public abstract String getFunctionCall(); +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/eb76640f/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java index 4d0e7a4..b4bc858 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java @@ -30,18 +30,7 @@ import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalProperties import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty; import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; -public class VXQueryCollectionDataSource implements IDataSource<String> { - private static final String DELIMITER = "\\|"; - private final int dataSourceId; - private final String collectionName; - private String[] collectionPartitions; - private final List<Integer> childSeq; - private int totalDataSources; - private String tag; - - private final Object[] types; - - private IDataSourcePropertiesProvider propProvider; +public class VXQueryCollectionDataSource extends AbstractVXQueryDataSource implements IDataSource<String> { private VXQueryCollectionDataSource(int id, String file, Object[] types) { this.dataSourceId = id; @@ -128,4 +117,9 @@ public class VXQueryCollectionDataSource implements IDataSource<String> { public String toString() { return "VXQueryCollectionDataSource [collectionName=" + collectionName + ", childSeq=" + childSeq + "]"; } + + @Override + public String getFunctionCall() { + return function; + } } http://git-wip-us.apache.org/repos/asf/vxquery/blob/eb76640f/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java new file mode 100644 index 0000000..da75108 --- /dev/null +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java @@ -0,0 +1,137 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.vxquery.metadata; + +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource; +import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider; +import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; +import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; +import org.apache.vxquery.compiler.rewriter.rules.CollectionFileDomain; + +import java.util.ArrayList; +import java.util.List; + +/** + * Datasource object for indexing. + */ +public class VXQueryIndexingDataSource extends AbstractVXQueryDataSource implements IDataSource<String> { + + protected Object[] types; + + protected IDataSourcePropertiesProvider propProvider; + private VXQueryIndexingDataSource(int id, String collection, String elementPath, Object[] types, + String functionCall) { + this.dataSourceId = id; + this.collectionName = collection; + this.elementPath = elementPath; + this.function = functionCall; + this.collectionPartitions = collectionName.split(DELIMITER); + + this.types = types; + final IPhysicalPropertiesVector vec = new StructuralPropertiesVector( + new RandomPartitioningProperty(new CollectionFileDomain(collectionName)), + new ArrayList<>()); + propProvider = new IDataSourcePropertiesProvider() { + @Override + public IPhysicalPropertiesVector computePropertiesVector(List<LogicalVariable> scanVariables) { + return vec; + } + }; + this.tag = null; + this.childSeq = new ArrayList<>(); + } + + public static VXQueryIndexingDataSource create(int id, String collection, String index, Object type, String + function) { + return new VXQueryIndexingDataSource(id, collection, index, new Object[] { type }, function); + } + + public int getTotalDataSources() { + return totalDataSources; + } + + public void setTotalDataSources(int totalDataSources) { + this.totalDataSources = totalDataSources; + } + + public int getDataSourceId() { + return dataSourceId; + } + + public String getElementPath() { + return elementPath; + } + + public String[] getCollectionPartitions() { + return collectionPartitions; + } + + public void setCollectionPartitions(String[] collectionPartitions) { + this.collectionPartitions = collectionPartitions; + } + + public int getPartitionCount() { + return collectionPartitions.length; + } + + public String getTag() { + return this.tag; + } + + public void setTag(String tag) { + this.tag = tag; + } + + @Override + public String getId() { + return collectionName; + } + + @Override + public Object[] getSchemaTypes() { + return types; + } + + @Override + public IDataSourcePropertiesProvider getPropertiesProvider() { + return propProvider; + } + + @Override + public void computeFDs(List scanVariables, List fdList) { + } + + @Override + public String toString() { + return "VXQueryIndexingDataSource [collectionName=" + collectionName + ", elementPath=" + elementPath + " " + + "function=" + function + "]"; + } + + @Override + public String getFunctionCall() { + return function; + } + + public List<Integer> getChildSeq() { + return childSeq; + } + +} + + http://git-wip-us.apache.org/repos/asf/vxquery/blob/eb76640f/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java new file mode 100644 index 0000000..ae637ac --- /dev/null +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.vxquery.metadata; + +import java.io.DataInputStream; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.logging.Logger; + +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.client.NodeControllerInfo; +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.comm.IFrameFieldAppender; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.IOperatorNodePushable; +import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.UTF8StringPointable; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.dataflow.common.comm.io.FrameFixedFieldTupleAppender; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream; +import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; +import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; +import org.apache.vxquery.context.DynamicContext; +import org.apache.vxquery.datamodel.accessors.TaggedValuePointable; +import org.apache.vxquery.datamodel.builders.sequence.SequenceBuilder; +import org.apache.vxquery.datamodel.values.XDMConstants; +import org.apache.vxquery.exceptions.SystemException; +import org.apache.vxquery.functions.BuiltinFunctions; +import org.apache.vxquery.runtime.functions.index.IndexConstructorUtil; +import org.apache.vxquery.runtime.functions.index.VXQueryIndexReader; +import org.apache.vxquery.runtime.functions.index.indexCentralizer.IndexCentralizerUtil; +import org.apache.vxquery.runtime.functions.index.updateIndex.IndexUpdater; +import org.apache.vxquery.xmlparser.ITreeNodeIdProvider; +import org.apache.vxquery.xmlparser.TreeNodeIdProvider; + +public class VXQueryIndexingOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { + protected static final Logger LOGGER = Logger.getLogger(VXQueryCollectionOperatorDescriptor.class.getName()); + private static final long serialVersionUID = 1L; + private short dataSourceId; + private short totalDataSources; + private String[] collectionPartitions; + private String elementPath; + private final String functionCall; + + public VXQueryIndexingOperatorDescriptor(IOperatorDescriptorRegistry spec, VXQueryIndexingDataSource ds, + RecordDescriptor rDesc, String hdfsConf, Map<String, NodeControllerInfo> nodeControllerInfos) { + super(spec, 1, 1); + this.functionCall = ds.getFunctionCall(); + collectionPartitions = ds.getCollectionPartitions(); + dataSourceId = (short) ds.getDataSourceId(); + totalDataSources = (short) ds.getTotalDataSources(); + recordDescriptors[0] = rDesc; + this.elementPath = ds.getElementPath(); + } + + @Override + public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { + final FrameTupleAccessor fta = new FrameTupleAccessor( + recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)); + final int fieldOutputCount = recordDescProvider.getOutputRecordDescriptor(getActivityId(), 0).getFieldCount(); + final IFrame frame = new VSizeFrame(ctx); + final IFrameFieldAppender appender = new FrameFixedFieldTupleAppender(fieldOutputCount); + final short partitionId = (short) ctx.getTaskAttemptId().getTaskId().getPartition(); + final ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider(partitionId, dataSourceId, totalDataSources); + final String nodeId = ctx.getJobletContext().getApplicationContext().getNodeId(); + final DynamicContext dCtx = (DynamicContext) ctx.getJobletContext().getGlobalJobData(); + final String collectionName = collectionPartitions[partition % collectionPartitions.length]; + String collectionModifiedName = collectionName.replace("${nodeId}", nodeId); + IndexCentralizerUtil indexCentralizerUtil = new IndexCentralizerUtil( + ctx.getIOManager().getIODevices().get(0).getPath()); + indexCentralizerUtil.readIndexDirectory(); + + return new AbstractUnaryInputUnaryOutputOperatorNodePushable() { + @Override + public void open() throws HyracksDataException { + appender.reset(frame, true); + writer.open(); + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + fta.reset(buffer); + + IPointable result = new TaggedValuePointable(); + + final UTF8StringPointable stringp = (UTF8StringPointable) UTF8StringPointable.FACTORY.createPointable(); + final TaggedValuePointable nodep = (TaggedValuePointable) TaggedValuePointable.FACTORY + .createPointable(); + + final ByteBufferInputStream bbis = new ByteBufferInputStream(); + final DataInputStream di = new DataInputStream(bbis); + final SequenceBuilder sb = new SequenceBuilder(); + final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage(); + final ArrayBackedValueStorage abvsFileNode = new ArrayBackedValueStorage(); + + String indexModifiedName; + if (collectionModifiedName.contains("hdfs://")) { + throw new HyracksDataException("Indexing support for HDFS not yet implemented."); + } else { + + if (functionCall.equals( + BuiltinFunctions.FN_BUILD_INDEX_ON_COLLECTION_1.getFunctionIdentifier().getName())) { + indexModifiedName = indexCentralizerUtil.putIndexForCollection(collectionModifiedName); + File collectionDirectory = new File(collectionModifiedName); + + //check if directory is in the local file system + if (collectionDirectory.exists() && collectionDirectory.isDirectory()) { + IndexConstructorUtil indexConstructorUtil = new IndexConstructorUtil(); + try { + indexConstructorUtil.evaluate(collectionModifiedName, indexModifiedName, result, + stringp, bbis, di, sb, abvs, nodeIdProvider, abvsFileNode, nodep, false, + nodeId); + XDMConstants.setTrue(result); + FrameUtils.appendFieldToWriter(writer, appender, result.getByteArray(), + result.getStartOffset(), result.getLength()); + } catch (SystemException e) { + throw new HyracksDataException("Could not create index for collection: " + + collectionName + " in dir: " + indexModifiedName + " " + e.getMessage()); + } + } else { + throw new HyracksDataException("Cannot find Collection Directory (" + nodeId + ":" + + collectionDirectory.getAbsolutePath() + ")"); + } + } else if (functionCall + .equals(BuiltinFunctions.FN_UPDATE_INDEX_1.getFunctionIdentifier().getName())) { + indexModifiedName = indexCentralizerUtil.getIndexForCollection(collectionModifiedName); + IndexUpdater updater = new IndexUpdater(indexModifiedName, result, stringp, bbis, di, sb, abvs, + nodeIdProvider, abvsFileNode, nodep, nodeId); + try { + updater.setup(); + updater.updateIndex(); + updater.updateMetadataFile(); + updater.exit(); + XDMConstants.setTrue(result); + FrameUtils.appendFieldToWriter(writer, appender, result.getByteArray(), + result.getStartOffset(), result.getLength()); + } catch (IOException | SystemException e) { + throw new HyracksDataException( + "Could not update index in " + indexModifiedName + " " + e.getMessage()); + } + } else if (functionCall + .equals(BuiltinFunctions.FN_DELETE_INDEX_1.getFunctionIdentifier().getName())) { + indexModifiedName = indexCentralizerUtil.getIndexForCollection(collectionModifiedName); + IndexUpdater updater = new IndexUpdater(indexModifiedName, result, stringp, bbis, di, sb, abvs, + nodeIdProvider, abvsFileNode, nodep, nodeId); + indexCentralizerUtil.deleteEntryForCollection(collectionModifiedName); + try { + updater.setup(); + updater.deleteAllIndexes(); + XDMConstants.setTrue(result); + FrameUtils.appendFieldToWriter(writer, appender, result.getByteArray(), + result.getStartOffset(), result.getLength()); + } catch (SystemException | IOException e) { + throw new HyracksDataException( + "Could not delete index in " + indexModifiedName + " " + e.getMessage()); + } + + } else if (functionCall + .equals(BuiltinFunctions.FN_COLLECTION_FROM_INDEX_2.getFunctionIdentifier().getName())) { + indexModifiedName = indexCentralizerUtil.getIndexForCollection(collectionModifiedName); + VXQueryIndexReader indexReader = new VXQueryIndexReader(ctx, indexModifiedName, elementPath); + try { + indexReader.init(); + while (indexReader.step(result)) { + FrameUtils.appendFieldToWriter(writer, appender, result.getByteArray(), + result.getStartOffset(), result.getLength()); + } + } catch (AlgebricksException e) { + throw new HyracksDataException("Could not read index."); + } + + } else { + throw new HyracksDataException("Unsupported function call (" + functionCall + ")"); + } + } + } + + @Override + public void fail() throws HyracksDataException { + writer.fail(); + } + + @Override + public void close() throws HyracksDataException { + // Check if needed? + if (appender.getTupleCount() > 0) { + appender.flush(writer, true); + } + writer.close(); + indexCentralizerUtil.writeIndexDirectory(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/eb76640f/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java index 820c365..b7b37b9 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java @@ -88,22 +88,44 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig) throws AlgebricksException { - VXQueryCollectionDataSource ds = (VXQueryCollectionDataSource) dataSource; + VXQueryCollectionDataSource ds = null; + VXQueryIndexingDataSource ids = null; + + try { + ids = (VXQueryIndexingDataSource) dataSource; + } catch (ClassCastException e) { + ds = (VXQueryCollectionDataSource) dataSource; + } if (sourceFileMap != null) { - final int len = ds.getPartitions().length; + final int len = ds != null ? ds.getPartitions().length : ids.getCollectionPartitions().length; String[] collectionPartitions = new String[len]; for (int i = 0; i < len; ++i) { - String partition = ds.getPartitions()[i]; + String partition = ds != null ? ds.getPartitions()[i] : ids.getCollectionPartitions()[i]; File mapped = sourceFileMap.get(partition); collectionPartitions[i] = mapped != null ? mapped.toString() : partition; } - ds.setPartitions(collectionPartitions); + if (ds != null) { + ds.setPartitions(collectionPartitions); + } else { + ids.setCollectionPartitions(collectionPartitions); + } + } + RecordDescriptor rDesc; + IOperatorDescriptor scanner; + AlgebricksPartitionConstraint constraint; + + if (ds != null) { + rDesc = new RecordDescriptor(new ISerializerDeserializer[opSchema.getSize()]); + scanner = new VXQueryCollectionOperatorDescriptor(jobSpec, ds, rDesc, this.hdfsConf, + this.nodeControllerInfos); + constraint = getClusterLocations(nodeList, ds.getPartitionCount()); + } else { + rDesc = new RecordDescriptor(new ISerializerDeserializer[opSchema.getSize()]); + scanner = new VXQueryIndexingOperatorDescriptor(jobSpec, ids, rDesc, this.hdfsConf, + this.nodeControllerInfos); + constraint = getClusterLocations(nodeList, ids.getPartitionCount()); } - RecordDescriptor rDesc = new RecordDescriptor(new ISerializerDeserializer[opSchema.getSize()]); - IOperatorDescriptor scanner = new VXQueryCollectionOperatorDescriptor(jobSpec, ds, rDesc, this.hdfsConf, - this.nodeControllerInfos); - AlgebricksPartitionConstraint constraint = getClusterLocations(nodeList, ds.getPartitionCount()); return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(scanner, constraint); } http://git-wip-us.apache.org/repos/asf/vxquery/blob/eb76640f/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/CollectionFromIndexUnnestingEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/CollectionFromIndexUnnestingEvaluatorFactory.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/CollectionFromIndexUnnestingEvaluatorFactory.java deleted file mode 100644 index ea78de5..0000000 --- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/CollectionFromIndexUnnestingEvaluatorFactory.java +++ /dev/null @@ -1,327 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.vxquery.runtime.functions.index; - -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; -import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; -import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluator; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.data.std.api.IPointable; -import org.apache.hyracks.data.std.primitive.UTF8StringPointable; -import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; -import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream; -import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.document.Document; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.IndexableField; -import org.apache.lucene.queryparser.classic.QueryParser; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.ScoreDoc; -import org.apache.lucene.search.TopDocs; -import org.apache.lucene.store.FSDirectory; -import org.apache.vxquery.datamodel.accessors.TaggedValuePointable; -import org.apache.vxquery.datamodel.values.ValueTag; -import org.apache.vxquery.exceptions.ErrorCode; -import org.apache.vxquery.exceptions.SystemException; -import org.apache.vxquery.index.IndexAttributes; -import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentUnnestingEvaluator; -import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentUnnestingEvaluatorFactory; -import org.apache.vxquery.xmlparser.ITreeNodeIdProvider; -import org.apache.vxquery.xmlparser.SAXContentHandler; -import org.apache.vxquery.xmlparser.TreeNodeIdProvider; -import org.xml.sax.Attributes; -import org.xml.sax.SAXException; - -import java.io.DataInputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -public class CollectionFromIndexUnnestingEvaluatorFactory extends AbstractTaggedValueArgumentUnnestingEvaluatorFactory { - private static final long serialVersionUID = 1L; - - public CollectionFromIndexUnnestingEvaluatorFactory(IScalarEvaluatorFactory[] args) { - super(args); - } - - @Override - protected IUnnestingEvaluator createEvaluator(IHyracksTaskContext ctx, IScalarEvaluator[] args) - throws AlgebricksException { - - return new AbstractTaggedValueArgumentUnnestingEvaluator(args) { - - private ArrayBackedValueStorage nodeAbvs = new ArrayBackedValueStorage(); - - private int indexPlace; - private int indexLength; - private String elementPath; - private String indexName; - - private UTF8StringPointable stringIndexFolder = (UTF8StringPointable) UTF8StringPointable.FACTORY - .createPointable(); - private UTF8StringPointable stringElementPath = (UTF8StringPointable) UTF8StringPointable.FACTORY - .createPointable(); - private ByteBufferInputStream bbis = new ByteBufferInputStream(); - private DataInputStream di = new DataInputStream(bbis); - - private IndexReader reader; - private IndexSearcher searcher; - private Analyzer analyzer; - private QueryParser parser; - private ScoreDoc[] hits; - private SAXContentHandler handler; - private Query query; - private Document doc; - private List<IndexableField> fields; - - @Override - public boolean step(IPointable result) throws AlgebricksException { - /* each step will create a tuple for a single xml file - * This is done using the parse function - * checkoverflow is used throughout. This is because memory might not be - * able to hold all of the results at once, so we return 1 million at - * a time and check when we need to get more - */ - if (indexPlace < indexLength) { - nodeAbvs.reset(); - try { - //TODO: now we get back the entire document - doc = searcher.doc(hits[indexPlace].doc); - fields = doc.getFields(); - parse(nodeAbvs); - } catch (IOException e) { - throw new AlgebricksException(e); - } - indexPlace += 1; - result.set(nodeAbvs.getByteArray(), nodeAbvs.getStartOffset(), nodeAbvs.getLength()); - return true; - } - return false; - } - - @Override - protected void init(TaggedValuePointable[] args) throws SystemException { - - int partition = ctxview.getTaskAttemptId().getTaskId().getPartition(); - ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider((short) partition); - handler = new SAXContentHandler(false, nodeIdProvider, true); - - nodeAbvs.reset(); - indexPlace = 0; - TaggedValuePointable tvp1 = args[0]; - TaggedValuePointable tvp2 = args[1]; - - if (tvp1.getTag() != ValueTag.XS_STRING_TAG || tvp2.getTag() != ValueTag.XS_STRING_TAG) { - throw new SystemException(ErrorCode.FORG0006); - } - tvp1.getValue(stringIndexFolder); - tvp2.getValue(stringElementPath); - //This whole loop is to get the string arguments, indexFolder, elementPath, and match option - try { - // Get the list of files. - bbis.setByteBuffer(ByteBuffer.wrap( - Arrays.copyOfRange(stringIndexFolder.getByteArray(), stringIndexFolder.getStartOffset(), - stringIndexFolder.getLength() + stringIndexFolder.getStartOffset())), - 0); - indexName = di.readUTF(); - bbis.setByteBuffer(ByteBuffer.wrap( - Arrays.copyOfRange(stringElementPath.getByteArray(), stringElementPath.getStartOffset(), - stringElementPath.getLength() + stringElementPath.getStartOffset())), - 0); - elementPath = di.readUTF(); - - indexPlace = 0; - - //Create the index reader. - reader = DirectoryReader.open(FSDirectory.open(Paths.get(indexName))); - } catch (IOException e) { - throw new SystemException(ErrorCode.SYSE0001, e); - } - - searcher = new IndexSearcher(reader); - analyzer = new CaseSensitiveAnalyzer(); - - parser = new CaseSensitiveQueryParser("item", analyzer); - - String queryString = elementPath.replaceAll("/", "."); - queryString = "item:" + queryString + "*"; - - int lastslash = elementPath.lastIndexOf("/"); - elementPath = elementPath.substring(0, lastslash) + ":" + elementPath.substring(lastslash + 1); - elementPath = elementPath.replaceAll("/", ".") + ".element"; - - TopDocs results = null; - try { - query = parser.parse(queryString); - - //TODO: Right now it only returns 1000000 results - results = searcher.search(query, 1000000); - - } catch (Exception e) { - throw new SystemException(null); - } - - hits = results.scoreDocs; - System.out.println("found: " + results.totalHits); - indexPlace = 0; - indexLength = hits.length; - - } - - public void parse(ArrayBackedValueStorage abvsFileNode) throws IOException { - try { - handler.startDocument(); - - for (int i = 0; i < fields.size(); i++) { - String fieldValue = fields.get(i).stringValue(); - if (fieldValue.equals(elementPath)) { - buildElement(abvsFileNode, i); - } - } - - handler.endDocument(); - handler.writeDocument(abvsFileNode); - } catch (Exception e) { - throw new IOException(e); - } - } - - private int buildElement(ArrayBackedValueStorage abvsFileNode, int fieldNum) throws SAXException { - int whereIFinish = fieldNum; - IndexableField field = fields.get(fieldNum); - String contents = field.stringValue(); - String uri = ""; - - int firstColon = contents.indexOf(':'); - int lastDot = contents.lastIndexOf('.'); - String type = contents.substring(lastDot + 1); - String lastBit = contents.substring(firstColon + 1, lastDot); - - if (type.equals("textnode")) { - char[] charContents = lastBit.toCharArray(); - handler.characters(charContents, 0, charContents.length); - - } - if (type.equals("element")) { - List<String> names = new ArrayList<String>(); - List<String> values = new ArrayList<String>(); - List<String> uris = new ArrayList<String>(); - List<String> localNames = new ArrayList<String>(); - List<String> types = new ArrayList<String>(); - List<String> qNames = new ArrayList<String>(); - whereIFinish = findAttributeChildren(whereIFinish, names, values, uris, localNames, types, qNames); - Attributes atts = new IndexAttributes(names, values, uris, localNames, types, qNames); - - handler.startElement(uri, lastBit, lastBit, atts); - - boolean noMoreChildren = false; - - while (whereIFinish + 1 < fields.size() && !noMoreChildren) { - if (isChild(fields.get(whereIFinish + 1), field)) { - whereIFinish = buildElement(abvsFileNode, whereIFinish + 1); - } else { - noMoreChildren = true; - } - } - - handler.endElement(uri, lastBit, lastBit); - - } - return whereIFinish; - } - - /*This function creates the attribute children for an element node - * - */ - int findAttributeChildren(int fieldnum, List<String> n, List<String> v, List<String> u, List<String> l, - List<String> t, List<String> q) { - int nextindex = fieldnum + 1; - boolean foundattributes = false; - if (nextindex < fields.size()) { - IndexableField nextguy; - - while (nextindex < fields.size()) { - nextguy = fields.get(nextindex); - String contents = nextguy.stringValue(); - int firstcolon = contents.indexOf(':'); - int lastdot = contents.lastIndexOf('.'); - String lastbit = contents.substring(firstcolon + 1, lastdot); - - if (isDirectChildAttribute(nextguy, fields.get(fieldnum))) { - foundattributes = true; - n.add(lastbit); - IndexableField nextnextguy = fields.get(nextindex + 1); - contents = nextnextguy.stringValue(); - firstcolon = contents.indexOf(':'); - lastdot = contents.lastIndexOf('.'); - String nextlastbit = contents.substring(firstcolon + 1, lastdot); - v.add(nextlastbit); - u.add(lastbit); - l.add(lastbit); - t.add(lastbit); - q.add(lastbit); - } else { - break; - } - nextindex += 2; - } - } - if (foundattributes) { - return nextindex - 1; - - } else { - return fieldnum; - } - } - - boolean isChild(IndexableField child, IndexableField adult) { - String childId = child.stringValue(); - String adultId = adult.stringValue(); - - int lastDotChild = childId.lastIndexOf('.'); - int lastDotAdult = adultId.lastIndexOf('.'); - - String childPath = childId.substring(0, lastDotChild); - String adultPath = adultId.substring(0, lastDotAdult); - adultPath = adultPath.replaceFirst(":", "."); - - return (childPath.startsWith(adultPath + ":") || childPath.startsWith(adultPath + ".")); - } - - boolean isDirectChildAttribute(IndexableField child, IndexableField adult) { - String childId = child.stringValue(); - String adultId = adult.stringValue(); - - String childPath = childId.substring(0, childId.lastIndexOf('.')); - String adultPath = adultId.substring(0, adultId.lastIndexOf('.')); - adultPath = adultPath.replaceFirst(":", "."); - String[] childSegments = child.stringValue().split("\\."); - - String childType = childSegments[childSegments.length - 1]; - - return (childPath.startsWith(adultPath + ":") && childType.equals("attribute")); - } - - }; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/vxquery/blob/eb76640f/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/IndexConstructorScalarEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/IndexConstructorScalarEvaluatorFactory.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/IndexConstructorScalarEvaluatorFactory.java deleted file mode 100644 index cdae2cf..0000000 --- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/IndexConstructorScalarEvaluatorFactory.java +++ /dev/null @@ -1,79 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.vxquery.runtime.functions.index; - -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; -import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.data.std.api.IPointable; -import org.apache.hyracks.data.std.primitive.UTF8StringPointable; -import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; -import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream; -import org.apache.vxquery.datamodel.accessors.TaggedValuePointable; -import org.apache.vxquery.datamodel.builders.sequence.SequenceBuilder; -import org.apache.vxquery.datamodel.values.XDMConstants; -import org.apache.vxquery.exceptions.ErrorCode; -import org.apache.vxquery.exceptions.SystemException; -import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentScalarEvaluator; -import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentScalarEvaluatorFactory; -import org.apache.vxquery.xmlparser.ITreeNodeIdProvider; -import org.apache.vxquery.xmlparser.TreeNodeIdProvider; - -import javax.xml.bind.JAXBException; -import java.io.DataInputStream; - -public class IndexConstructorScalarEvaluatorFactory extends AbstractTaggedValueArgumentScalarEvaluatorFactory { - //Creates one Lucene doc per file - - private static final long serialVersionUID = 1L; - - public IndexConstructorScalarEvaluatorFactory(IScalarEvaluatorFactory[] args) { - super(args); - } - - @Override - protected IScalarEvaluator createEvaluator(IHyracksTaskContext ctx, IScalarEvaluator[] args) - throws AlgebricksException { - final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage(); - final UTF8StringPointable stringp = (UTF8StringPointable) UTF8StringPointable.FACTORY.createPointable(); - final TaggedValuePointable nodep = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable(); - final ByteBufferInputStream bbis = new ByteBufferInputStream(); - final DataInputStream di = new DataInputStream(bbis); - final SequenceBuilder sb = new SequenceBuilder(); - final ArrayBackedValueStorage abvsFileNode = new ArrayBackedValueStorage(); - final int partition = ctx.getTaskAttemptId().getTaskId().getPartition(); - final String nodeId = ctx.getJobletContext().getApplicationContext().getNodeId(); - final ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider((short) partition); - - return new AbstractTaggedValueArgumentScalarEvaluator(args) { - - @Override - protected void evaluate(TaggedValuePointable[] args, IPointable result) throws SystemException { - try { - IndexConstructorUtil - .evaluate(args, result, stringp, bbis, di, sb, abvs, nodeIdProvider, abvsFileNode, - nodep, false, nodeId); - XDMConstants.setTrue(result); - } catch (JAXBException e) { - throw new SystemException(ErrorCode.SYSE0001, e); - } - } - - }; - } -}
