Ali Alsuliman has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/3391
Change subject: [ASTERIXDB-2564][RT] Too many objects created in min() and max() ...................................................................... [ASTERIXDB-2564][RT] Too many objects created in min() and max() - user model changes: no - storage format changes: no - interface changes: no Details: During min() and max() aggregation, the functions keep track of the aggregation type in order to handle heterogeneous lists. It promotes the aggregation type if needed (e.g. encountered double). Don't switch to new aggregation type and create a new comparator when the new input value type is the same as the previously aggregated values. That is because canPromote(agg_type, new_val_type) will always return true for same types. Change-Id: I0bb9f0715985ae555de00bbf3173c80371d8968b --- M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java 1 file changed, 40 insertions(+), 54 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/91/3391/1 diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java index 86ae924..616bb5a 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java @@ -30,7 +30,6 @@ import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IBinaryComparator; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.SourceLocation; import org.apache.hyracks.data.std.api.IPointable; @@ -39,18 +38,16 @@ import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; public abstract class AbstractMinMaxAggregateFunction extends AbstractAggregateFunction { - private IPointable inputVal = new VoidPointable(); - private ArrayBackedValueStorage outputVal = new ArrayBackedValueStorage(); - private ArrayBackedValueStorage tempValForCasting = new ArrayBackedValueStorage(); - - protected ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage(); - private IScalarEvaluator eval; + protected final ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage(); + private final IPointable inputVal = new VoidPointable(); + private final ArrayBackedValueStorage outputVal = new ArrayBackedValueStorage(); + private final ArrayBackedValueStorage tempValForCasting = new ArrayBackedValueStorage(); + private final IScalarEvaluator eval; + private final boolean isMin; protected ATypeTag aggType; private IBinaryComparator cmp; - private ITypeConvertComputer tpc; - private final boolean isMin; - public AbstractMinMaxAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, boolean isMin, + AbstractMinMaxAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, boolean isMin, SourceLocation sourceLoc) throws HyracksDataException { super(sourceLoc); eval = args[0].createScalarEvaluator(context); @@ -82,9 +79,8 @@ // First value encountered. Set type, comparator, and initial value. aggType = typeTag; // Set comparator. - IBinaryComparatorFactory cmpFactory = - BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(aggType, isMin); - cmp = cmpFactory.createBinaryComparator(); + cmp = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(aggType, isMin) + .createBinaryComparator(); // Initialize min value. outputVal.assign(inputVal); } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) { @@ -94,56 +90,27 @@ throw new IncompatibleTypeException(sourceLoc, "min/max", aggType.serialize(), typeTag.serialize()); } } else { - // If a system_null is encountered locally, it would be an error; otherwise if it is seen // by a global aggregator, it is simple ignored. if (typeTag == ATypeTag.SYSTEM_NULL) { processSystemNull(); return; } - + if (aggType == typeTag) { + compareAndUpdate(cmp, inputVal, outputVal); + return; + } if (ATypeHierarchy.canPromote(aggType, typeTag)) { - tpc = ATypeHierarchy.getTypePromoteComputer(aggType, typeTag); - aggType = typeTag; - cmp = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(aggType, isMin) + // switch to new comp & aggregation type (i.e. current min/max is int and new input is double) + cmp = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(typeTag, isMin) .createBinaryComparator(); - if (tpc != null) { - tempValForCasting.reset(); - try { - tpc.convertType(outputVal.getByteArray(), outputVal.getStartOffset() + 1, - outputVal.getLength() - 1, tempValForCasting.getDataOutput()); - } catch (IOException e) { - throw HyracksDataException.create(e); - } - outputVal.assign(tempValForCasting); - } - if (cmp.compare(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength(), - outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength()) < 0) { - outputVal.assign(inputVal); - } - + castValue(ATypeHierarchy.getTypePromoteComputer(aggType, typeTag), outputVal, tempValForCasting); + outputVal.assign(tempValForCasting); + compareAndUpdate(cmp, inputVal, outputVal); + aggType = typeTag; } else { - tpc = ATypeHierarchy.getTypePromoteComputer(typeTag, aggType); - if (tpc != null) { - tempValForCasting.reset(); - try { - tpc.convertType(inputVal.getByteArray(), inputVal.getStartOffset() + 1, - inputVal.getLength() - 1, tempValForCasting.getDataOutput()); - } catch (IOException e) { - throw HyracksDataException.create(e); - } - if (cmp.compare(tempValForCasting.getByteArray(), tempValForCasting.getStartOffset(), - tempValForCasting.getLength(), outputVal.getByteArray(), outputVal.getStartOffset(), - outputVal.getLength()) < 0) { - outputVal.assign(tempValForCasting); - } - } else { - if (cmp.compare(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength(), - outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength()) < 0) { - outputVal.assign(inputVal); - } - } - + castValue(ATypeHierarchy.getTypePromoteComputer(typeTag, aggType), inputVal, tempValForCasting); + compareAndUpdate(cmp, tempValForCasting, outputVal); } } } @@ -187,4 +154,23 @@ protected abstract void processSystemNull() throws HyracksDataException; protected abstract void finishSystemNull() throws IOException; + + private static void compareAndUpdate(IBinaryComparator comp, IPointable newVal, ArrayBackedValueStorage oldVal) + throws HyracksDataException { + if (comp.compare(newVal.getByteArray(), newVal.getStartOffset(), newVal.getLength(), oldVal.getByteArray(), + oldVal.getStartOffset(), oldVal.getLength()) < 0) { + oldVal.assign(newVal); + } + } + + private static void castValue(ITypeConvertComputer typeConverter, IPointable inputValue, + ArrayBackedValueStorage tempValForCasting) throws HyracksDataException { + tempValForCasting.reset(); + try { + typeConverter.convertType(inputValue.getByteArray(), inputValue.getStartOffset() + 1, + inputValue.getLength() - 1, tempValForCasting.getDataOutput()); + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } } -- To view, visit https://asterix-gerrit.ics.uci.edu/3391 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-MessageType: newchange Gerrit-Change-Id: I0bb9f0715985ae555de00bbf3173c80371d8968b Gerrit-Change-Number: 3391 Gerrit-PatchSet: 1 Gerrit-Owner: Ali Alsuliman <ali.al.solai...@gmail.com>