http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab4bed0c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlStddevAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlStddevAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlStddevAggregateFunction.java new file mode 100644 index 0000000..005eaad --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlStddevAggregateFunction.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.serializable.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +import java.io.DataOutput; + +public class SerializableSqlStddevAggregateFunction + extends AbstractSerializableSingleVariableStatisticsAggregateFunction { + + public SerializableSqlStddevAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, + SourceLocation sourceLoc) throws HyracksDataException { + super(args, context, sourceLoc); + } + + @Override + public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException { + processDataValues(tuple, state, start, len); + } + + @Override + public void finish(byte[] state, int start, int len, DataOutput result) throws HyracksDataException { + finishStddevFinalResults(state, start, len, result); + } + + @Override + public void finishPartial(byte[] state, int start, int len, DataOutput result) throws HyracksDataException { + finish(state, start, len, result); + } + + @Override + protected void processNull(byte[] state, int start) { + } + + @Override + protected FunctionIdentifier getFunctionIdentifier() { + return BuiltinFunctions.STDDEV; + } + +}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab4bed0c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableStddevAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableStddevAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableStddevAggregateDescriptor.java new file mode 100644 index 0000000..f20abc7 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableStddevAggregateDescriptor.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.serializable.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptor; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class SerializableStddevAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor { + + private static final long serialVersionUID = 1L; + public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { + @Override + public IFunctionDescriptor createFunctionDescriptor() { + return new SerializableStddevAggregateDescriptor(); + } + }; + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.SERIAL_STDDEV; + } + + @Override + public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory( + final IScalarEvaluatorFactory[] args) throws AlgebricksException { + return new ISerializedAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx) + throws HyracksDataException { + return new SerializableStddevAggregateFunction(args, ctx, sourceLoc); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab4bed0c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableStddevAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableStddevAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableStddevAggregateFunction.java new file mode 100644 index 0000000..f30848d --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableStddevAggregateFunction.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.serializable.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.types.EnumDeserializer; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +import java.io.DataOutput; + +public class SerializableStddevAggregateFunction extends AbstractSerializableSingleVariableStatisticsAggregateFunction { + + public SerializableStddevAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, + SourceLocation sourceLoc) throws HyracksDataException { + super(args, context, sourceLoc); + } + + @Override + public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException { + processDataValues(tuple, state, start, len); + } + + @Override + public void finish(byte[] state, int start, int len, DataOutput result) throws HyracksDataException { + finishStddevFinalResults(state, start, len, result); + } + + @Override + public void finishPartial(byte[] state, int start, int len, DataOutput result) throws HyracksDataException { + finish(state, start, len, result); + } + + @Override + protected void processNull(byte[] state, int start) { + state[start + AGG_TYPE_OFFSET] = ATypeTag.SERIALIZED_NULL_TYPE_TAG; + } + + @Override + protected boolean skipStep(byte[] state, int start) { + ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]); + return aggType == ATypeTag.NULL; + } + + @Override + protected FunctionIdentifier getFunctionIdentifier() { + return BuiltinFunctions.STDDEV; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab4bed0c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java index 1dde27b..164fa6f 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java @@ -135,7 +135,11 @@ public abstract class AbstractAvgAggregateFunction extends AbstractAggregateFunc } else if (aggType == ATypeTag.SYSTEM_NULL) { aggType = typeTag; } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) { - throw new IncompatibleTypeException(sourceLoc, BuiltinFunctions.AVG, data[offset], aggType.serialize()); + if (typeTag.ordinal() > aggType.ordinal()) { + throw new IncompatibleTypeException(sourceLoc, BuiltinFunctions.AVG, data[offset], aggType.serialize()); + } else { + throw new IncompatibleTypeException(sourceLoc, BuiltinFunctions.AVG, aggType.serialize(), data[offset]); + } } else if (ATypeHierarchy.canPromote(aggType, typeTag)) { aggType = typeTag; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab4bed0c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java index 1aa609f..59e0ccc 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java @@ -88,7 +88,11 @@ public abstract class AbstractMinMaxAggregateFunction extends AbstractAggregateF // Initialize min value. outputVal.assign(inputVal); } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) { - throw new IncompatibleTypeException(sourceLoc, "min/max", typeTag.serialize(), aggType.serialize()); + if (typeTag.ordinal() > aggType.ordinal()) { + throw new IncompatibleTypeException(sourceLoc, "min/max", typeTag.serialize(), aggType.serialize()); + } else { + throw new IncompatibleTypeException(sourceLoc, "min/max", aggType.serialize(), typeTag.serialize()); + } } else { // If a system_null is encountered locally, it would be an error; otherwise if it is seen http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab4bed0c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSingleVarStatisticsAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSingleVarStatisticsAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSingleVarStatisticsAggregateFunction.java new file mode 100644 index 0000000..8141839 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSingleVarStatisticsAggregateFunction.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.common.config.GlobalConfig; +import org.apache.asterix.dataflow.data.nontagged.serde.*; +import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; +import org.apache.asterix.om.base.*; +import org.apache.asterix.om.types.*; +import org.apache.asterix.om.types.hierachy.ATypeHierarchy; +import org.apache.asterix.runtime.aggregates.utils.SingleVarFunctionsUtil; +import org.apache.asterix.runtime.evaluators.common.AccessibleByteArrayEval; +import org.apache.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval; +import org.apache.asterix.runtime.exceptions.IncompatibleTypeException; +import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.VoidPointable; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.data.std.util.ByteArrayAccessibleOutputStream; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; + +public abstract class AbstractSingleVarStatisticsAggregateFunction extends AbstractAggregateFunction { + + /* + M1 and M2 are the 1st and 2nd central moment of a data sample + */ + private static final int M1_FIELD_ID = 0; + private static final int M2_FIELD_ID = 1; + private static final int COUNT_FIELD_ID = 2; + + private final ARecordType recType; + + private ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage(); + private IPointable inputVal = new VoidPointable(); + private IScalarEvaluator eval; + protected ATypeTag aggType; + private SingleVarFunctionsUtil moments = new SingleVarFunctionsUtil(); + private AMutableDouble aDouble = new AMutableDouble(0); + private AMutableInt64 aInt64 = new AMutableInt64(0); + + private IPointable resultBytes = new VoidPointable(); + private ByteArrayAccessibleOutputStream m1Bytes = new ByteArrayAccessibleOutputStream(); + private DataOutput m1BytesOutput = new DataOutputStream(m1Bytes); + private ByteArrayAccessibleOutputStream m2Bytes = new ByteArrayAccessibleOutputStream(); + private DataOutput m2BytesOutput = new DataOutputStream(m2Bytes); + private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream(); + private DataOutput countBytesOutput = new DataOutputStream(countBytes); + private IScalarEvaluator evalM1 = new AccessibleByteArrayEval(m1Bytes); + private IScalarEvaluator evalM2 = new AccessibleByteArrayEval(m2Bytes); + private IScalarEvaluator evalCount = new AccessibleByteArrayEval(countBytes); + private ClosedRecordConstructorEval recordEval; + + @SuppressWarnings("unchecked") + private ISerializerDeserializer<ADouble> doubleSerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE); + @SuppressWarnings("unchecked") + private ISerializerDeserializer<AInt64> longSerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64); + @SuppressWarnings("unchecked") + private ISerializerDeserializer<ANull> nullSerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL); + + public AbstractSingleVarStatisticsAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, + SourceLocation sourceLoc) throws HyracksDataException { + super(sourceLoc); + eval = args[0].createScalarEvaluator(context); + recType = new ARecordType(null, new String[] { "m1", "m2", "count" }, + new IAType[] { BuiltinType.ADOUBLE, BuiltinType.ADOUBLE, BuiltinType.AINT64 }, false); + recordEval = new ClosedRecordConstructorEval(recType, new IScalarEvaluator[] { evalM1, evalM2, evalCount }); + } + + @Override + public void init() throws HyracksDataException { + aggType = ATypeTag.SYSTEM_NULL; + moments.set(0, 0, 0); + } + + @Override + public abstract void step(IFrameTupleReference tuple) throws HyracksDataException; + + @Override + public abstract void finish(IPointable result) throws HyracksDataException; + + @Override + public abstract void finishPartial(IPointable result) throws HyracksDataException; + + protected abstract FunctionIdentifier getFunctionIdentifier(); + + protected abstract void processNull(); + + protected void processDataValues(IFrameTupleReference tuple) throws HyracksDataException { + if (skipStep()) { + return; + } + eval.evaluate(tuple, inputVal); + byte[] data = inputVal.getByteArray(); + int offset = inputVal.getStartOffset(); + + ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(data[offset]); + if (typeTag == ATypeTag.MISSING || typeTag == ATypeTag.NULL) { + processNull(); + return; + } else if (aggType == ATypeTag.SYSTEM_NULL) { + aggType = typeTag; + } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) { + if (typeTag.ordinal() > aggType.ordinal()) { + throw new IncompatibleTypeException(sourceLoc, getFunctionIdentifier(), data[offset], + aggType.serialize()); + } else { + throw new IncompatibleTypeException(sourceLoc, getFunctionIdentifier(), aggType.serialize(), + data[offset]); + } + } else if (ATypeHierarchy.canPromote(aggType, typeTag)) { + aggType = typeTag; + } + double val; + switch (typeTag) { + case TINYINT: + val = AInt8SerializerDeserializer.getByte(data, offset + 1); + moments.push(val); + break; + case SMALLINT: + val = AInt16SerializerDeserializer.getShort(data, offset + 1); + moments.push(val); + break; + case INTEGER: + val = AInt32SerializerDeserializer.getInt(data, offset + 1); + moments.push(val); + break; + case BIGINT: + val = AInt64SerializerDeserializer.getLong(data, offset + 1); + moments.push(val); + break; + case FLOAT: + val = AFloatSerializerDeserializer.getFloat(data, offset + 1); + moments.push(val); + break; + case DOUBLE: + val = ADoubleSerializerDeserializer.getDouble(data, offset + 1); + moments.push(val); + break; + default: + throw new UnsupportedItemTypeException(sourceLoc, getFunctionIdentifier(), data[offset]); + } + } + + protected void finishPartialResults(IPointable result) throws HyracksDataException { + resultStorage.reset(); + try { + // Double check that count 0 is accounted + if (aggType == ATypeTag.SYSTEM_NULL) { + if (GlobalConfig.DEBUG) { + GlobalConfig.ASTERIX_LOGGER.trace("Single var statistics aggregate ran over empty input."); + } + resultStorage.getDataOutput().writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG); + result.set(resultStorage); + } else if (aggType == ATypeTag.NULL) { + resultStorage.getDataOutput().writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG); + result.set(resultStorage); + } else { + m1Bytes.reset(); + aDouble.setValue(moments.getM1()); + doubleSerde.serialize(aDouble, m1BytesOutput); + m2Bytes.reset(); + aDouble.setValue(moments.getM2()); + doubleSerde.serialize(aDouble, m2BytesOutput); + countBytes.reset(); + aInt64.setValue(moments.getCount()); + longSerde.serialize(aInt64, countBytesOutput); + recordEval.evaluate(null, resultBytes); + result.set(resultBytes); + } + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } + + protected void processPartialResults(IFrameTupleReference tuple) throws HyracksDataException { + if (skipStep()) { + return; + } + eval.evaluate(tuple, inputVal); + byte[] serBytes = inputVal.getByteArray(); + int offset = inputVal.getStartOffset(); + ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[offset]); + switch (typeTag) { + case NULL: + processNull(); + break; + case SYSTEM_NULL: + // Ignore and return. + break; + case OBJECT: + // Expected. + aggType = ATypeTag.DOUBLE; + int nullBitmapSize = 0; + int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, offset, M1_FIELD_ID, + nullBitmapSize, false); + int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, offset, M2_FIELD_ID, + nullBitmapSize, false); + int offset3 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, offset, COUNT_FIELD_ID, + nullBitmapSize, false); + double temp_m1 = ADoubleSerializerDeserializer.getDouble(serBytes, offset1); + double temp_m2 = ADoubleSerializerDeserializer.getDouble(serBytes, offset2); + long temp_count = AInt64SerializerDeserializer.getLong(serBytes, offset3); + moments.combine(temp_m1, temp_m2, temp_count); + break; + default: + throw new UnsupportedItemTypeException(sourceLoc, "intermediate/global-single-var-statistics", + serBytes[offset]); + } + } + + protected void finishStddevFinalResults(IPointable result) throws HyracksDataException { + resultStorage.reset(); + try { + if (moments.getCount() <= 1 || aggType == ATypeTag.NULL) { + nullSerde.serialize(ANull.NULL, resultStorage.getDataOutput()); + } else { + aDouble.setValue(Math.sqrt(moments.getM2() / (moments.getCount() - 1))); + doubleSerde.serialize(aDouble, resultStorage.getDataOutput()); + } + } catch (IOException e) { + throw HyracksDataException.create(e); + } + result.set(resultStorage); + } + + protected boolean skipStep() { + return false; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab4bed0c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java index a8fa552..037e307 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java @@ -95,8 +95,13 @@ public abstract class AbstractSumAggregateFunction extends AbstractAggregateFunc } else if (aggType == ATypeTag.SYSTEM_NULL) { aggType = typeTag; } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) { - throw new IncompatibleTypeException(sourceLoc, BuiltinFunctions.SUM, typeTag.serialize(), - aggType.serialize()); + if (typeTag.ordinal() > aggType.ordinal()) { + throw new IncompatibleTypeException(sourceLoc, BuiltinFunctions.SUM, typeTag.serialize(), + aggType.serialize()); + } else { + throw new IncompatibleTypeException(sourceLoc, BuiltinFunctions.SUM, aggType.serialize(), + typeTag.serialize()); + } } if (ATypeHierarchy.canPromote(aggType, typeTag)) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab4bed0c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlStddevAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlStddevAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlStddevAggregateDescriptor.java new file mode 100644 index 0000000..19d90b2 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlStddevAggregateDescriptor.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptor; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class GlobalSqlStddevAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor { + + private static final long serialVersionUID = 1L; + + public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { + @Override + public IFunctionDescriptor createFunctionDescriptor() { + return new GlobalSqlStddevAggregateDescriptor(); + } + }; + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.GLOBAL_SQL_STDDEV; + } + + @Override + public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) { + return new IAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IAggregateEvaluator createAggregateEvaluator(final IHyracksTaskContext ctx) + throws HyracksDataException { + return new GlobalSqlStddevAggregateFunction(args, ctx, sourceLoc); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab4bed0c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlStddevAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlStddevAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlStddevAggregateFunction.java new file mode 100644 index 0000000..3728201 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalSqlStddevAggregateFunction.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +public class GlobalSqlStddevAggregateFunction extends AbstractSingleVarStatisticsAggregateFunction { + + public GlobalSqlStddevAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, + SourceLocation sourceLoc) throws HyracksDataException { + super(args, context, sourceLoc); + } + + @Override + public void step(IFrameTupleReference tuple) throws HyracksDataException { + processPartialResults(tuple); + } + + @Override + public void finish(IPointable result) throws HyracksDataException { + finishStddevFinalResults(result); + } + + @Override + public void finishPartial(IPointable result) throws HyracksDataException { + finishPartialResults(result); + } + + @Override + protected void processNull() { + } + + @Override + protected FunctionIdentifier getFunctionIdentifier() { + return BuiltinFunctions.STDDEV; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab4bed0c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalStddevAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalStddevAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalStddevAggregateDescriptor.java new file mode 100644 index 0000000..3ab204c --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalStddevAggregateDescriptor.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptor; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class GlobalStddevAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor { + + private static final long serialVersionUID = 1L; + + public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { + @Override + public IFunctionDescriptor createFunctionDescriptor() { + return new GlobalStddevAggregateDescriptor(); + } + }; + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.GLOBAL_STDDEV; + } + + @Override + public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) { + return new IAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IAggregateEvaluator createAggregateEvaluator(final IHyracksTaskContext ctx) + throws HyracksDataException { + return new GlobalStddevAggregateFunction(args, ctx, sourceLoc); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab4bed0c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalStddevAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalStddevAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalStddevAggregateFunction.java new file mode 100644 index 0000000..5ceea4b --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/GlobalStddevAggregateFunction.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +public class GlobalStddevAggregateFunction extends AbstractSingleVarStatisticsAggregateFunction { + + public GlobalStddevAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, + SourceLocation sourceLoc) throws HyracksDataException { + super(args, context, sourceLoc); + } + + @Override + public void step(IFrameTupleReference tuple) throws HyracksDataException { + processPartialResults(tuple); + } + + @Override + public void finish(IPointable result) throws HyracksDataException { + finishStddevFinalResults(result); + } + + @Override + public void finishPartial(IPointable result) throws HyracksDataException { + finishPartialResults(result); + } + + @Override + protected void processNull() { + aggType = ATypeTag.NULL; + } + + @Override + protected boolean skipStep() { + return aggType == ATypeTag.NULL; + } + + @Override + protected FunctionIdentifier getFunctionIdentifier() { + return BuiltinFunctions.STDDEV; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab4bed0c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlStddevAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlStddevAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlStddevAggregateDescriptor.java new file mode 100644 index 0000000..f96a757 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlStddevAggregateDescriptor.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptor; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class IntermediateSqlStddevAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor { + + private static final long serialVersionUID = 1L; + public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { + @Override + public IFunctionDescriptor createFunctionDescriptor() { + return new IntermediateSqlStddevAggregateDescriptor(); + } + }; + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.INTERMEDIATE_SQL_STDDEV; + } + + @Override + public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) { + return new IAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IAggregateEvaluator createAggregateEvaluator(final IHyracksTaskContext ctx) + throws HyracksDataException { + return new IntermediateSqlStddevAggregateFunction(args, ctx, sourceLoc); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab4bed0c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlStddevAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlStddevAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlStddevAggregateFunction.java new file mode 100644 index 0000000..44a0315 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateSqlStddevAggregateFunction.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +public class IntermediateSqlStddevAggregateFunction extends AbstractSingleVarStatisticsAggregateFunction { + + public IntermediateSqlStddevAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, + SourceLocation sourceLoc) throws HyracksDataException { + super(args, context, sourceLoc); + } + + @Override + public void step(IFrameTupleReference tuple) throws HyracksDataException { + processPartialResults(tuple); + } + + @Override + public void finish(IPointable result) throws HyracksDataException { + finishPartialResults(result); + } + + @Override + public void finishPartial(IPointable result) throws HyracksDataException { + finishPartialResults(result); + } + + @Override + protected void processNull() { + } + + @Override + protected FunctionIdentifier getFunctionIdentifier() { + return BuiltinFunctions.STDDEV; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab4bed0c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateStddevAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateStddevAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateStddevAggregateDescriptor.java new file mode 100644 index 0000000..503072b --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateStddevAggregateDescriptor.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptor; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class IntermediateStddevAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor { + + private static final long serialVersionUID = 1L; + + public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { + @Override + public IFunctionDescriptor createFunctionDescriptor() { + return new IntermediateStddevAggregateDescriptor(); + } + }; + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.INTERMEDIATE_STDDEV; + } + + @Override + public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) { + return new IAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IAggregateEvaluator createAggregateEvaluator(final IHyracksTaskContext ctx) + throws HyracksDataException { + return new IntermediateStddevAggregateFunction(args, ctx, sourceLoc); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab4bed0c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateStddevAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateStddevAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateStddevAggregateFunction.java new file mode 100644 index 0000000..238a523 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/IntermediateStddevAggregateFunction.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.runtime.aggregates.serializable.std.AbstractSerializableSingleVariableStatisticsAggregateFunction; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +public class IntermediateStddevAggregateFunction extends AbstractSingleVarStatisticsAggregateFunction { + + public IntermediateStddevAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, + SourceLocation sourceLoc) throws HyracksDataException { + super(args, context, sourceLoc); + } + + @Override + public void step(IFrameTupleReference tuple) throws HyracksDataException { + processPartialResults(tuple); + } + + @Override + public void finish(IPointable result) throws HyracksDataException { + finishPartialResults(result); + } + + @Override + public void finishPartial(IPointable result) throws HyracksDataException { + finishPartialResults(result); + } + + @Override + protected void processNull() { + aggType = ATypeTag.NULL; + } + + @Override + protected boolean skipStep() { + return aggType == ATypeTag.NULL; + } + + @Override + protected FunctionIdentifier getFunctionIdentifier() { + return BuiltinFunctions.STDDEV; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab4bed0c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlStddevAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlStddevAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlStddevAggregateDescriptor.java new file mode 100644 index 0000000..c89e71f --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlStddevAggregateDescriptor.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptor; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class LocalSqlStddevAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor { + + private static final long serialVersionUID = 1L; + public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { + @Override + public IFunctionDescriptor createFunctionDescriptor() { + return new LocalSqlStddevAggregateDescriptor(); + } + }; + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.LOCAL_SQL_STDDEV; + } + + @Override + public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) { + return new IAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IAggregateEvaluator createAggregateEvaluator(final IHyracksTaskContext ctx) + throws HyracksDataException { + return new LocalSqlStddevAggregateFunction(args, ctx, sourceLoc); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab4bed0c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlStddevAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlStddevAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlStddevAggregateFunction.java new file mode 100644 index 0000000..b030b8e --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSqlStddevAggregateFunction.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +public class LocalSqlStddevAggregateFunction extends AbstractSingleVarStatisticsAggregateFunction { + + public LocalSqlStddevAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, + SourceLocation sourceLoc) throws HyracksDataException { + super(args, context, sourceLoc); + } + + @Override + public void step(IFrameTupleReference tuple) throws HyracksDataException { + processDataValues(tuple); + } + + @Override + public void finish(IPointable result) throws HyracksDataException { + finishPartialResults(result); + } + + @Override + public void finishPartial(IPointable result) throws HyracksDataException { + finish(result); + } + + @Override + protected void processNull() { + } + + @Override + protected FunctionIdentifier getFunctionIdentifier() { + return BuiltinFunctions.STDDEV; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab4bed0c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalStddevAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalStddevAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalStddevAggregateDescriptor.java new file mode 100644 index 0000000..79bea48 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalStddevAggregateDescriptor.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptor; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class LocalStddevAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor { + + private static final long serialVersionUID = 1L; + public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { + @Override + public IFunctionDescriptor createFunctionDescriptor() { + return new LocalStddevAggregateDescriptor(); + } + }; + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.LOCAL_STDDEV; + } + + @Override + public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) { + return new IAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IAggregateEvaluator createAggregateEvaluator(final IHyracksTaskContext ctx) + throws HyracksDataException { + return new LocalStddevAggregateFunction(args, ctx, sourceLoc); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab4bed0c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalStddevAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalStddevAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalStddevAggregateFunction.java new file mode 100644 index 0000000..7ebdb1f --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalStddevAggregateFunction.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +public class LocalStddevAggregateFunction extends AbstractSingleVarStatisticsAggregateFunction { + + public LocalStddevAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, + SourceLocation sourceLoc) throws HyracksDataException { + super(args, context, sourceLoc); + } + + @Override + public void step(IFrameTupleReference tuple) throws HyracksDataException { + processDataValues(tuple); + } + + @Override + public void finish(IPointable result) throws HyracksDataException { + finishPartialResults(result); + } + + @Override + public void finishPartial(IPointable result) throws HyracksDataException { + finish(result); + } + + @Override + protected void processNull() { + aggType = ATypeTag.NULL; + } + + @Override + protected boolean skipStep() { + return aggType == ATypeTag.NULL; + } + + @Override + protected FunctionIdentifier getFunctionIdentifier() { + return BuiltinFunctions.STDDEV; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab4bed0c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlStddevAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlStddevAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlStddevAggregateDescriptor.java new file mode 100644 index 0000000..41cc3ee --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlStddevAggregateDescriptor.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptor; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class SqlStddevAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor { + + private static final long serialVersionUID = 1L; + public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { + @Override + public IFunctionDescriptor createFunctionDescriptor() { + return new SqlStddevAggregateDescriptor(); + } + }; + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.SQL_STDDEV; + } + + @Override + public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) { + return new IAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IAggregateEvaluator createAggregateEvaluator(final IHyracksTaskContext ctx) + throws HyracksDataException { + return new SqlStddevAggregateFunction(args, ctx, sourceLoc); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab4bed0c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlStddevAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlStddevAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlStddevAggregateFunction.java new file mode 100644 index 0000000..962d351 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlStddevAggregateFunction.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +public class SqlStddevAggregateFunction extends AbstractSingleVarStatisticsAggregateFunction { + + public SqlStddevAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, + SourceLocation sourceLoc) throws HyracksDataException { + super(args, context, sourceLoc); + } + + @Override + public void step(IFrameTupleReference tuple) throws HyracksDataException { + processDataValues(tuple); + } + + @Override + public void finish(IPointable result) throws HyracksDataException { + finishStddevFinalResults(result); + } + + @Override + public void finishPartial(IPointable result) throws HyracksDataException { + finishPartialResults(result); + } + + @Override + protected void processNull() { + } + + @Override + protected FunctionIdentifier getFunctionIdentifier() { + return BuiltinFunctions.STDDEV; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab4bed0c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/StddevAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/StddevAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/StddevAggregateDescriptor.java new file mode 100644 index 0000000..0b6bb5b --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/StddevAggregateDescriptor.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptor; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class StddevAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor { + + private static final long serialVersionUID = 1L; + public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { + @Override + public IFunctionDescriptor createFunctionDescriptor() { + return new StddevAggregateDescriptor(); + } + }; + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.STDDEV; + } + + @Override + public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) { + return new IAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IAggregateEvaluator createAggregateEvaluator(final IHyracksTaskContext ctx) + throws HyracksDataException { + return new StddevAggregateFunction(args, ctx, sourceLoc); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab4bed0c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/StddevAggregateFunction.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/StddevAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/StddevAggregateFunction.java new file mode 100644 index 0000000..7b5e423 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/StddevAggregateFunction.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.aggregates.std; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +public class StddevAggregateFunction extends AbstractSingleVarStatisticsAggregateFunction { + + public StddevAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, + SourceLocation sourceLoc) throws HyracksDataException { + super(args, context, sourceLoc); + } + + @Override + public void step(IFrameTupleReference tuple) throws HyracksDataException { + processDataValues(tuple); + } + + @Override + public void finish(IPointable result) throws HyracksDataException { + finishStddevFinalResults(result); + } + + @Override + public void finishPartial(IPointable result) throws HyracksDataException { + finish(result); + } + + @Override + protected void processNull() { + aggType = ATypeTag.NULL; + } + + @Override + protected boolean skipStep() { + return aggType == ATypeTag.NULL; + } + + @Override + protected FunctionIdentifier getFunctionIdentifier() { + return BuiltinFunctions.STDDEV; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab4bed0c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/utils/SingleVarFunctionsUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/utils/SingleVarFunctionsUtil.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/utils/SingleVarFunctionsUtil.java new file mode 100644 index 0000000..3ce6a39 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/utils/SingleVarFunctionsUtil.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.utils; + +/** +The central moments of a data sample are useful for calculating statistics (such as stddev and +variance) in a single pass. + +This class is based on a paper written by Philippe Pébay: Formulas for Robust, One-Pass Parallel +Computation of Covariances and Arbitrary-Order Statistical Moments, 2008, Technical Report +SAND2008-6212, Sandia National Laboratories. +*/ +public class SingleVarFunctionsUtil { + private double m1; + private double m2; + private long count; + + public SingleVarFunctionsUtil() { + m1 = 0.0; + m2 = 0.0; + count = 0; + } + + /** + * Set the central moments of your current data sample + * + * @param moment1 first moment (mean) of the data sample + * @param moment2 second moment of the data sample + * @param cnt number of samples + */ + public void set(double moment1, double moment2, long cnt) { + m1 = moment1; + m2 = moment2; + count = cnt; + } + + /** + * Update the central moments after adding val to your data sample + * + * @param val value to add to the data sample + */ + public void push(double val) { + count++; + double delta = val - m1; + double delta_n = delta / count; + double term1 = delta * delta_n * (count - 1); + m1 += delta / count; + m2 += term1; + } + + /** + * Combine two sets of central moments into one. + * + * @param moment1 first moment (mean) of the data sample + * @param moment2 second moment of the data sample + * @param cnt number of samples + */ + public void combine(double moment1, double moment2, long cnt) { + double delta = moment1 - m1; + long combined_count = count + cnt; + m1 = (count * m1 + cnt * moment1) / combined_count; + m2 += moment2 + delta * delta * count * cnt / combined_count; + count = combined_count; + } + + public double getM1() { + return m1; + } + + public double getM2() { + return m2; + } + + public long getCount() { + return count; + } +}