Added
Project: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/commit/99a4555a Tree: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/tree/99a4555a Diff: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/diff/99a4555a Branch: refs/heads/prestonc/hash_join Commit: 99a4555a505c21b767aa0476ef448f58ecc6581e Parents: 32f6b97 Author: Preston Carman <[email protected]> Authored: Mon Mar 17 18:10:42 2014 -0700 Committer: Preston Carman <[email protected]> Committed: Tue Apr 1 20:56:25 2014 -0700 ---------------------------------------------------------------------- .../rules/IntroduceTwoStepAggregateRule.java | 58 +++++---- .../vxquery/functions/builtin-operators.xml | 14 +++ .../AvgGlobalAggregateEvaluatorFactory.java | 117 +++++++++++++++++++ .../AvgLocalAggregateEvaluatorFactory.java | 104 +++++++++++++++++ 4 files changed, 263 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/99a4555a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceTwoStepAggregateRule.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceTwoStepAggregateRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceTwoStepAggregateRule.java index 29cf34f..d6f302c 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceTwoStepAggregateRule.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceTwoStepAggregateRule.java @@ -16,10 +16,15 @@ */ package org.apache.vxquery.compiler.rewriter.rules; +import java.util.HashMap; +import java.util.Map; + import org.apache.commons.lang3.mutable.Mutable; import org.apache.vxquery.functions.BuiltinFunctions; +import org.apache.vxquery.functions.BuiltinOperators; import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; +import edu.uci.ics.hyracks.algebricks.common.utils.Pair; import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression; import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator; import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext; @@ -27,6 +32,8 @@ import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag; import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression; +import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo; import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; @@ -50,6 +57,9 @@ import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; * if (af1 == count) aggregate operating settings: * Step 1: count * Step 2: sum + * if (af1 == avg) aggregate operating settings: + * Step 1: avg-local + * Step 2: avg-global * if (af1 in (max, min, sum)) aggregate operating settings: * Step 1: af1 * Step 2: af1 @@ -58,6 +68,21 @@ import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; * @author prestonc */ public class IntroduceTwoStepAggregateRule implements IAlgebraicRewriteRule { + final Map<FunctionIdentifier, Pair<IFunctionInfo, IFunctionInfo>> AGGREGATE_MAP = new HashMap<FunctionIdentifier, Pair<IFunctionInfo, IFunctionInfo>>(); + + public IntroduceTwoStepAggregateRule() { + AGGREGATE_MAP.put(BuiltinFunctions.FN_AVG_1.getFunctionIdentifier(), new Pair<IFunctionInfo, IFunctionInfo>( + BuiltinOperators.AVG_LOCAL, BuiltinOperators.AVG_GLOBAL)); + AGGREGATE_MAP.put(BuiltinFunctions.FN_COUNT_1.getFunctionIdentifier(), new Pair<IFunctionInfo, IFunctionInfo>( + BuiltinFunctions.FN_COUNT_1, BuiltinFunctions.FN_SUM_1)); + AGGREGATE_MAP.put(BuiltinFunctions.FN_MAX_1.getFunctionIdentifier(), new Pair<IFunctionInfo, IFunctionInfo>( + BuiltinFunctions.FN_MAX_1, BuiltinFunctions.FN_MAX_1)); + AGGREGATE_MAP.put(BuiltinFunctions.FN_MIN_1.getFunctionIdentifier(), new Pair<IFunctionInfo, IFunctionInfo>( + BuiltinFunctions.FN_MIN_1, BuiltinFunctions.FN_MIN_1)); + AGGREGATE_MAP.put(BuiltinFunctions.FN_SUM_1.getFunctionIdentifier(), new Pair<IFunctionInfo, IFunctionInfo>( + BuiltinFunctions.FN_SUM_1, BuiltinFunctions.FN_SUM_1)); + } + @Override public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException { // Check if aggregate function. @@ -74,41 +99,14 @@ public class IntroduceTwoStepAggregateRule implements IAlgebraicRewriteRule { } AbstractFunctionCallExpression functionCall = (AbstractFunctionCallExpression) logicalExpression; - if (functionCall.getFunctionIdentifier().equals(BuiltinFunctions.FN_COUNT_1.getFunctionIdentifier())) { - AggregateFunctionCallExpression aggregateFunctionCall = (AggregateFunctionCallExpression) functionCall; - if (aggregateFunctionCall.isTwoStep()) { - return false; - } - aggregateFunctionCall.setTwoStep(true); - aggregateFunctionCall.setStepOneAggregate(BuiltinFunctions.FN_COUNT_1); - aggregateFunctionCall.setStepTwoAggregate(BuiltinFunctions.FN_SUM_1); - return true; - } else if (functionCall.getFunctionIdentifier().equals(BuiltinFunctions.FN_MAX_1.getFunctionIdentifier())) { - AggregateFunctionCallExpression aggregateFunctionCall = (AggregateFunctionCallExpression) functionCall; - if (aggregateFunctionCall.isTwoStep()) { - return false; - } - aggregateFunctionCall.setTwoStep(true); - aggregateFunctionCall.setStepOneAggregate(BuiltinFunctions.FN_MAX_1); - aggregateFunctionCall.setStepTwoAggregate(BuiltinFunctions.FN_MAX_1); - return true; - } else if (functionCall.getFunctionIdentifier().equals(BuiltinFunctions.FN_MIN_1.getFunctionIdentifier())) { - AggregateFunctionCallExpression aggregateFunctionCall = (AggregateFunctionCallExpression) functionCall; - if (aggregateFunctionCall.isTwoStep()) { - return false; - } - aggregateFunctionCall.setTwoStep(true); - aggregateFunctionCall.setStepOneAggregate(BuiltinFunctions.FN_MIN_1); - aggregateFunctionCall.setStepTwoAggregate(BuiltinFunctions.FN_MIN_1); - return true; - } else if (functionCall.getFunctionIdentifier().equals(BuiltinFunctions.FN_SUM_1.getFunctionIdentifier())) { + if (AGGREGATE_MAP.containsKey(functionCall.getFunctionIdentifier())) { AggregateFunctionCallExpression aggregateFunctionCall = (AggregateFunctionCallExpression) functionCall; if (aggregateFunctionCall.isTwoStep()) { return false; } aggregateFunctionCall.setTwoStep(true); - aggregateFunctionCall.setStepOneAggregate(BuiltinFunctions.FN_SUM_1); - aggregateFunctionCall.setStepTwoAggregate(BuiltinFunctions.FN_SUM_1); + aggregateFunctionCall.setStepOneAggregate(AGGREGATE_MAP.get(functionCall.getFunctionIdentifier()).first); + aggregateFunctionCall.setStepTwoAggregate(AGGREGATE_MAP.get(functionCall.getFunctionIdentifier()).second); return true; } return false; http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/99a4555a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml index 6a6c28d..ecf7542 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml +++ b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml @@ -515,6 +515,20 @@ <return type="xs:boolean"/> </operator> + <!-- fn:avg-local($arg as xs:anyAtomicType*) as xs:anyAtomicType? --> + <operator name="opext:avg-local"> + <param name="arg" type="xs:anyAtomicType*"/> + <return type="xs:anyAtomicType?"/> + <runtime type="aggregate" class="org.apache.vxquery.runtime.functions.aggregate.AvgLocalAggregateEvaluatorFactory"/> + </operator> + + <!-- fn:avg-global($arg as xs:anyAtomicType*) as xs:anyAtomicType? --> + <operator name="opext:avg-global"> + <param name="arg" type="xs:anyAtomicType*"/> + <return type="xs:anyAtomicType?"/> + <runtime type="aggregate" class="org.apache.vxquery.runtime.functions.aggregate.AvgGlobalAggregateEvaluatorFactory"/> + </operator> + <!-- opext:ordered($arg as item()*) as item()* --> <operator name="opext:ordered"> <param name="arg" type="item()*"/> http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/99a4555a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/aggregate/AvgGlobalAggregateEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/aggregate/AvgGlobalAggregateEvaluatorFactory.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/aggregate/AvgGlobalAggregateEvaluatorFactory.java new file mode 100644 index 0000000..61ae69e --- /dev/null +++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/aggregate/AvgGlobalAggregateEvaluatorFactory.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.vxquery.runtime.functions.aggregate; + +import java.io.DataOutput; + +import org.apache.vxquery.datamodel.accessors.SequencePointable; +import org.apache.vxquery.datamodel.accessors.TaggedValuePointable; +import org.apache.vxquery.datamodel.values.ValueTag; +import org.apache.vxquery.datamodel.values.XDMConstants; +import org.apache.vxquery.exceptions.ErrorCode; +import org.apache.vxquery.exceptions.SystemException; +import org.apache.vxquery.runtime.functions.arithmetic.AddOperation; +import org.apache.vxquery.runtime.functions.arithmetic.DivideOperation; +import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentAggregateEvaluator; +import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentAggregateEvaluatorFactory; +import org.apache.vxquery.runtime.functions.util.FunctionHelper; + +import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; +import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluator; +import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator; +import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import edu.uci.ics.hyracks.data.std.api.IPointable; +import edu.uci.ics.hyracks.data.std.primitive.LongPointable; +import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage; + +public class AvgGlobalAggregateEvaluatorFactory extends AbstractTaggedValueArgumentAggregateEvaluatorFactory { + private static final long serialVersionUID = 1L; + + public AvgGlobalAggregateEvaluatorFactory(IScalarEvaluatorFactory[] args) { + super(args); + } + + @Override + protected IAggregateEvaluator createEvaluator(IScalarEvaluator[] args) throws AlgebricksException { + final ArrayBackedValueStorage abvsCount = new ArrayBackedValueStorage(); + final DataOutput dOutCount = abvsCount.getDataOutput(); + final ArrayBackedValueStorage abvsSum = new ArrayBackedValueStorage(); + final DataOutput dOutSum = abvsSum.getDataOutput(); + final AddOperation aOp = new AddOperation(); + final DivideOperation aOpDivide = new DivideOperation(); + final LongPointable longp = (LongPointable) LongPointable.FACTORY.createPointable(); + final SequencePointable seq = (SequencePointable) SequencePointable.FACTORY.createPointable(); + final TaggedValuePointable tvpArg = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable(); + + return new AbstractTaggedValueArgumentAggregateEvaluator(args) { + TaggedValuePointable tvpSum = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable(); + TaggedValuePointable tvpCount = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable(); + + @Override + public void init() throws AlgebricksException { + try { + abvsCount.reset(); + dOutCount.write(ValueTag.XS_INTEGER_TAG); + dOutCount.writeLong(0); + tvpCount.set(abvsCount); + abvsSum.reset(); + dOutSum.write(ValueTag.XS_INTEGER_TAG); + dOutSum.writeLong(0); + tvpSum.set(abvsSum); + } catch (Exception e) { + throw new AlgebricksException(e); + } + } + + @Override + public void finish(IPointable result) throws AlgebricksException { + tvpCount.getValue(longp); + if (longp.getLong() == 0) { + XDMConstants.setEmptySequence(result); + } else { + // Set count as a TaggedValuePointable. + try { + FunctionHelper.arithmeticOperation(aOpDivide, dCtx, tvpSum, tvpCount, tvpSum); + result.set(tvpSum); + } catch (Exception e) { + throw new AlgebricksException(e); + } + } + } + + @Override + protected void step(TaggedValuePointable[] args) throws SystemException { + TaggedValuePointable tvp = args[0]; + if (tvp.getTag() == ValueTag.SEQUENCE_TAG) { + tvp.getValue(seq); + int seqLen = seq.getEntryCount(); + if (seqLen == 0) { + // No results from nodes. + return; + } else if (seqLen == 2) { + seq.getEntry(0, tvpArg); + FunctionHelper.arithmeticOperation(aOp, dCtx, tvpArg, tvpCount, tvpCount); + seq.getEntry(1, tvpArg); + FunctionHelper.arithmeticOperation(aOp, dCtx, tvpArg, tvpSum, tvpSum); + } else { + throw new SystemException(ErrorCode.SYSE0001); + } + } + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/99a4555a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/aggregate/AvgLocalAggregateEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/aggregate/AvgLocalAggregateEvaluatorFactory.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/aggregate/AvgLocalAggregateEvaluatorFactory.java new file mode 100644 index 0000000..91657c6 --- /dev/null +++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/aggregate/AvgLocalAggregateEvaluatorFactory.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.vxquery.runtime.functions.aggregate; + +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.vxquery.datamodel.accessors.TaggedValuePointable; +import org.apache.vxquery.datamodel.builders.sequence.SequenceBuilder; +import org.apache.vxquery.datamodel.values.ValueTag; +import org.apache.vxquery.datamodel.values.XDMConstants; +import org.apache.vxquery.exceptions.ErrorCode; +import org.apache.vxquery.exceptions.SystemException; +import org.apache.vxquery.runtime.functions.arithmetic.AddOperation; +import org.apache.vxquery.runtime.functions.arithmetic.DivideOperation; +import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentAggregateEvaluator; +import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentAggregateEvaluatorFactory; +import org.apache.vxquery.runtime.functions.util.FunctionHelper; + +import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; +import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluator; +import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator; +import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import edu.uci.ics.hyracks.data.std.api.IPointable; +import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage; + +public class AvgLocalAggregateEvaluatorFactory extends AbstractTaggedValueArgumentAggregateEvaluatorFactory { + private static final long serialVersionUID = 1L; + + public AvgLocalAggregateEvaluatorFactory(IScalarEvaluatorFactory[] args) { + super(args); + } + + @Override + protected IAggregateEvaluator createEvaluator(IScalarEvaluator[] args) throws AlgebricksException { + final TaggedValuePointable tvpCount = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable(); + final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage(); + final ArrayBackedValueStorage abvsSeq = new ArrayBackedValueStorage(); + final SequenceBuilder sb = new SequenceBuilder(); + final DataOutput dOut = abvs.getDataOutput(); + final AddOperation aOp = new AddOperation(); + + return new AbstractTaggedValueArgumentAggregateEvaluator(args) { + long count; + TaggedValuePointable tvpSum = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable(); + + @Override + public void init() throws AlgebricksException { + count = 0; + } + + @Override + public void finish(IPointable result) throws AlgebricksException { + if (count == 0) { + XDMConstants.setEmptySequence(result); + } else { + // Set count as a TaggedValuePointable. + try { + abvs.reset(); + dOut.write(ValueTag.XS_INTEGER_TAG); + dOut.writeLong(count); + tvpCount.set(abvs); + + // Save intermediate result. + abvsSeq.reset(); + sb.reset(abvsSeq); + sb.addItem(tvpCount); + sb.addItem(tvpSum); + sb.finish(); + result.set(abvsSeq); + } catch (Exception e) { + throw new AlgebricksException(e); + } + } + } + + @Override + protected void step(TaggedValuePointable[] args) throws SystemException { + TaggedValuePointable tvp = args[0]; + if (count == 0) { + // Init. + tvpSum.set(tvp); + } else { + FunctionHelper.arithmeticOperation(aOp, dCtx, tvp, tvpSum, tvpSum); + } + count++; + } + }; + } +} \ No newline at end of file
