http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/misc/prefix-search/prefix-search.3.ast ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/misc/prefix-search/prefix-search.3.ast b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/misc/prefix-search/prefix-search.3.ast index 1dc730f..edea622 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/misc/prefix-search/prefix-search.3.ast +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/misc/prefix-search/prefix-search.3.ast @@ -51,4 +51,9 @@ Orderby Field=l_linenumber ] ASC + FieldAccessor [ + Variable [ Name=$l ] + Field=l_orderkey + ] + ASC
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/misc/stable_sort/stable_sort.3.ast ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/misc/stable_sort/stable_sort.3.ast b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/misc/stable_sort/stable_sort.3.ast index 87320ab..0460dc9 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/misc/stable_sort/stable_sort.3.ast +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/misc/stable_sort/stable_sort.3.ast @@ -14,4 +14,14 @@ Orderby Field=l_partkey ] DESC + FieldAccessor [ + Variable [ Name=$i ] + Field=l_orderkey + ] + ASC + FieldAccessor [ + Variable [ Name=$i ] + Field=l_linenumber + ] + ASC http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/temporal/overlap_bins_gby_0/overlap_bins_gby_0.3.ast ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/temporal/overlap_bins_gby_0/overlap_bins_gby_0.3.ast b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/temporal/overlap_bins_gby_0/overlap_bins_gby_0.3.ast index 36fb0a1..9ea33d5 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/temporal/overlap_bins_gby_0/overlap_bins_gby_0.3.ast +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/temporal/overlap_bins_gby_0/overlap_bins_gby_0.3.ast @@ -137,4 +137,14 @@ Orderby ] ] ASC + Variable [ Name=$itv ] + ASC + FunctionCall test.get-overlapping-interval@2[ + FieldAccessor [ + Variable [ Name=$gen0 ] + Field=bin + ] + Variable [ Name=$itv ] + ] + ASC http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml index 3236e62..f235d54 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml @@ -4339,6 +4339,16 @@ <output-dir compare="Text">uuid</output-dir> </compilation-unit> </test-case> + <test-case FilePath="misc"> + <compilation-unit name="p_sort_seq_merge"> + <output-dir compare="Text">p_sort_seq_merge</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="misc"> + <compilation-unit name="p_sort_num_samples"> + <output-dir compare="Text">p_sort_num_samples</output-dir> + </compilation-unit> + </test-case> </test-group> <test-group name="index"> <test-group name="index/validations"> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java index 6d89ff5..4c58ad7 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java @@ -22,6 +22,7 @@ import static org.apache.hyracks.control.common.config.OptionTypes.*; import static org.apache.hyracks.util.StorageUtil.StorageUnit.KILOBYTE; import static org.apache.hyracks.util.StorageUtil.StorageUnit.MEGABYTE; +import org.apache.hyracks.algebricks.core.config.AlgebricksConfig; import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.api.config.IOptionType; import org.apache.hyracks.api.config.Section; @@ -58,7 +59,12 @@ public class CompilerProperties extends AbstractProperties { + "other integer values dictate the number of query execution parallel partitions. The system will " + "fall back to use the number of all available CPU cores in the cluster as the degree of parallelism " + "if the number set by a user is too large or too small"), - COMPILER_STRINGOFFSET(INTEGER, 0, "Position of a first character in a String/Binary (0 or 1)"); + COMPILER_STRINGOFFSET(INTEGER, 0, "Position of a first character in a String/Binary (0 or 1)"), + COMPILER_SORT_PARALLEL(BOOLEAN, AlgebricksConfig.SORT_PARALLEL, "Enabling/Disabling full parallel sort"), + COMPILER_SORT_SAMPLES( + INTEGER, + AlgebricksConfig.SORT_SAMPLES, + "The number of samples parallel sorting should " + "take from each partition"); private final IOptionType type; private final Object defaultValue; @@ -106,6 +112,10 @@ public class CompilerProperties extends AbstractProperties { public static final String COMPILER_PARALLELISM_KEY = Option.COMPILER_PARALLELISM.ini(); + public static final String COMPILER_SORT_PARALLEL_KEY = Option.COMPILER_SORT_PARALLEL.ini(); + + public static final String COMPILER_SORT_SAMPLES_KEY = Option.COMPILER_SORT_SAMPLES.ini(); + public static final int COMPILER_PARALLELISM_AS_STORAGE = 0; public CompilerProperties(PropertiesAccessor accessor) { @@ -140,4 +150,13 @@ public class CompilerProperties extends AbstractProperties { int value = accessor.getInt(Option.COMPILER_STRINGOFFSET); return value > 0 ? 1 : 0; } + + public boolean getSortParallel() { + return accessor.getBoolean(Option.COMPILER_SORT_PARALLEL); + } + + public int getSortSamples() { + int numSamples = accessor.getInt(Option.COMPILER_SORT_SAMPLES); + return numSamples > 0 ? numSamples : AlgebricksConfig.SORT_SAMPLES; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java index dda6f7b..1bf2447 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java @@ -179,6 +179,7 @@ public class ErrorCode { public static final int COMPILATION_TYPE_MISMATCH_GENERIC = 1091; public static final int ILLEGAL_SET_PARAMETER = 1092; public static final int COMPILATION_TRANSLATION_ERROR = 1093; + public static final int RANGE_MAP_ERROR = 1094; // Feed errors public static final int DATAFLOW_ILLEGAL_STATE = 3001; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties index 4c8c866..5629b97 100644 --- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties +++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties @@ -166,6 +166,7 @@ 1090 = Field %1$s must be of an array of type %2$s but found to contain an item of type %3$s 1092 = Parameter %1$s cannot be set 1093 = A parser error has occurred. The detail exception: %1$s +1094 = Cannot parse range map: %1$s # Feed Errors 3001 = Illegal state. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/base/ByteArrayAccessibleDataInputStream.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/base/ByteArrayAccessibleDataInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/base/ByteArrayAccessibleDataInputStream.java deleted file mode 100644 index b92aa6c..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/base/ByteArrayAccessibleDataInputStream.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.library.java.base; - -import java.io.DataInputStream; - -public class ByteArrayAccessibleDataInputStream extends DataInputStream { - - public ByteArrayAccessibleDataInputStream(ByteArrayAccessibleInputStream in) { - super(in); - } - - public ByteArrayAccessibleInputStream getInputStream() { - return (ByteArrayAccessibleInputStream) in; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/base/ByteArrayAccessibleInputStream.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/base/ByteArrayAccessibleInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/base/ByteArrayAccessibleInputStream.java deleted file mode 100644 index 62e354b..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/base/ByteArrayAccessibleInputStream.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.library.java.base; - -import java.io.ByteArrayInputStream; - -public class ByteArrayAccessibleInputStream extends ByteArrayInputStream { - - public ByteArrayAccessibleInputStream(byte[] buf, int offset, int length) { - super(buf, offset, length); - } - - public void setContent(byte[] buf, int offset, int length) { - this.buf = buf; - this.pos = offset; - this.count = Math.min(offset + length, buf.length); - this.mark = offset; - } - - public byte[] getArray() { - return buf; - } - - public int getPosition() { - return pos; - } - - public int getCount() { - return count; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/asterixdb/asterix-lang-aql/pom.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-aql/pom.xml b/asterixdb/asterix-lang-aql/pom.xml index 6181fe6..f202192 100644 --- a/asterixdb/asterix-lang-aql/pom.xml +++ b/asterixdb/asterix-lang-aql/pom.xml @@ -158,10 +158,6 @@ <artifactId>commons-io</artifactId> </dependency> <dependency> - <groupId>org.apache.hyracks</groupId> - <artifactId>hyracks-dataflow-common</artifactId> - </dependency> - <dependency> <groupId>org.apache.asterix</groupId> <artifactId>asterix-om</artifactId> <version>${project.version}</version> @@ -184,10 +180,6 @@ <groupId>org.apache.hyracks</groupId> <artifactId>hyracks-api</artifactId> </dependency> - <dependency> - <groupId>org.apache.hyracks</groupId> - <artifactId>hyracks-data-std</artifactId> - </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/RangeMapBuilder.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/RangeMapBuilder.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/RangeMapBuilder.java deleted file mode 100644 index 23b0066..0000000 --- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/RangeMapBuilder.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.lang.aql.util; - -import java.io.DataOutput; -import java.util.List; - -import org.apache.asterix.common.exceptions.CompilationException; -import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider; -import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; -import org.apache.asterix.lang.aql.parser.AQLParserFactory; -import org.apache.asterix.lang.common.base.Expression; -import org.apache.asterix.lang.common.base.Expression.Kind; -import org.apache.asterix.lang.common.base.IParser; -import org.apache.asterix.lang.common.base.IParserFactory; -import org.apache.asterix.lang.common.base.Literal; -import org.apache.asterix.lang.common.base.Statement; -import org.apache.asterix.lang.common.expression.ListConstructor; -import org.apache.asterix.lang.common.expression.LiteralExpr; -import org.apache.asterix.lang.common.literal.DoubleLiteral; -import org.apache.asterix.lang.common.literal.FloatLiteral; -import org.apache.asterix.lang.common.literal.IntegerLiteral; -import org.apache.asterix.lang.common.literal.LongIntegerLiteral; -import org.apache.asterix.lang.common.literal.StringLiteral; -import org.apache.asterix.lang.common.statement.Query; -import org.apache.asterix.om.base.AMutableDouble; -import org.apache.asterix.om.base.AMutableFloat; -import org.apache.asterix.om.base.AMutableInt32; -import org.apache.asterix.om.base.AMutableInt64; -import org.apache.asterix.om.base.AMutableString; -import org.apache.asterix.om.types.ATypeTag; -import org.apache.asterix.om.types.BuiltinType; -import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException; -import org.apache.hyracks.api.dataflow.value.IBinaryComparator; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; -import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap; -import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap; - -public abstract class RangeMapBuilder { - private static final IParserFactory parserFactory = new AQLParserFactory(); - - public static IRangeMap parseHint(Object hint) throws CompilationException { - ArrayBackedValueStorage abvs = new ArrayBackedValueStorage(); - DataOutput out = abvs.getDataOutput(); - abvs.reset(); - - IParser parser = parserFactory.createParser((String) hint); - List<Statement> hintStatements = parser.parse(); - if (hintStatements.size() != 1) { - throw new CompilationException("Only one range statement is allowed for the range hint."); - } - - // Translate the query into a Range Map - if (hintStatements.get(0).getKind() != Statement.Kind.QUERY) { - throw new CompilationException("Not a proper query for the range hint."); - } - Query q = (Query) hintStatements.get(0); - - if (q.getBody().getKind() != Kind.LIST_CONSTRUCTOR_EXPRESSION) { - throw new CompilationException("The range hint must be a list."); - } - List<Expression> el = ((ListConstructor) q.getBody()).getExprList(); - int offsets[] = new int[el.size()]; - - // Loop over list of literals - for (int i = 0; i < el.size(); ++i) { - Expression item = el.get(i); - if (item.getKind() == Kind.LITERAL_EXPRESSION) { - parseLiteralToBytes(item, out); - offsets[i] = abvs.getLength(); - } - // TODO Add support for composite fields. - } - - return new RangeMap(1, abvs.getByteArray(), offsets); - } - - @SuppressWarnings("unchecked") - private static void parseLiteralToBytes(Expression item, DataOutput out) throws CompilationException { - AMutableDouble aDouble = new AMutableDouble(0); - AMutableFloat aFloat = new AMutableFloat(0); - AMutableInt64 aInt64 = new AMutableInt64(0); - AMutableInt32 aInt32 = new AMutableInt32(0); - AMutableString aString = new AMutableString(""); - @SuppressWarnings("rawtypes") - ISerializerDeserializer serde; - - Literal l = ((LiteralExpr) item).getValue(); - try { - switch (l.getLiteralType()) { - case DOUBLE: - DoubleLiteral dl = (DoubleLiteral) l; - serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE); - aDouble.setValue(dl.getValue()); - serde.serialize(aDouble, out); - break; - case FLOAT: - FloatLiteral fl = (FloatLiteral) l; - serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT); - aFloat.setValue(fl.getValue()); - serde.serialize(aFloat, out); - break; - case INTEGER: - IntegerLiteral il = (IntegerLiteral) l; - serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32); - aInt32.setValue(il.getValue()); - serde.serialize(aInt32, out); - break; - case LONG: - LongIntegerLiteral lil = (LongIntegerLiteral) l; - serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64); - aInt64.setValue(lil.getValue()); - serde.serialize(aInt64, out); - break; - case STRING: - StringLiteral sl = (StringLiteral) l; - serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING); - aString.setValue(sl.getValue()); - serde.serialize(aString, out); - break; - default: - throw new NotImplementedException("The range map builder has not been implemented for " - + item.getKind() + " type of expressions."); - } - } catch (HyracksDataException e) { - throw new CompilationException(e.getMessage()); - } - } - - public static void verifyRangeOrder(IRangeMap rangeMap, boolean ascending) throws CompilationException { - // TODO Add support for composite fields. - int fieldIndex = 0; - int fieldType = rangeMap.getTag(0, 0); - BinaryComparatorFactoryProvider comparatorFactory = BinaryComparatorFactoryProvider.INSTANCE; - IBinaryComparatorFactory bcf = - comparatorFactory.getBinaryComparatorFactory(ATypeTag.VALUE_TYPE_MAPPING[fieldType], ascending); - IBinaryComparator comparator = bcf.createBinaryComparator(); - int c = 0; - for (int split = 1; split < rangeMap.getSplitCount(); ++split) { - if (fieldType != rangeMap.getTag(fieldIndex, split)) { - throw new CompilationException("Range field contains more than a single type of items (" + fieldType - + " and " + rangeMap.getTag(fieldIndex, split) + ")."); - } - int previousSplit = split - 1; - try { - c = comparator.compare(rangeMap.getByteArray(fieldIndex, previousSplit), - rangeMap.getStartOffset(fieldIndex, previousSplit), - rangeMap.getLength(fieldIndex, previousSplit), rangeMap.getByteArray(fieldIndex, split), - rangeMap.getStartOffset(fieldIndex, split), rangeMap.getLength(fieldIndex, split)); - } catch (HyracksDataException e) { - throw new CompilationException(e); - } - if (c >= 0) { - throw new CompilationException("Range fields are not in sorted order."); - } - } - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj index 856073d..35c2ae8 100644 --- a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj +++ b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj @@ -67,12 +67,12 @@ import org.apache.asterix.lang.aql.clause.DistinctClause; import org.apache.asterix.lang.aql.clause.ForClause; import org.apache.asterix.lang.aql.expression.FLWOGRExpression; import org.apache.asterix.lang.aql.expression.UnionExpr; -import org.apache.asterix.lang.aql.util.RangeMapBuilder; import org.apache.asterix.lang.aql.util.AQLFormatPrintUtil; import org.apache.asterix.lang.common.base.Clause; import org.apache.asterix.lang.common.base.Expression; import org.apache.asterix.lang.common.base.ILangExpression; import org.apache.asterix.lang.common.base.IParser; +import org.apache.asterix.lang.common.base.IParserFactory; import org.apache.asterix.lang.common.base.Literal; import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.lang.common.clause.GroupbyClause; @@ -150,6 +150,7 @@ import org.apache.asterix.lang.common.struct.Identifier; import org.apache.asterix.lang.common.struct.OperatorType; import org.apache.asterix.lang.common.struct.QuantifiedPair; import org.apache.asterix.lang.common.struct.VarIdentifier; +import org.apache.asterix.lang.common.util.RangeMapBuilder; import org.apache.asterix.metadata.utils.MetadataConstants; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.common.utils.Triple; @@ -212,6 +213,10 @@ class AQLParser extends ScopeChecker implements IParser { return s.substring(1).trim(); } + private static IParser createNewParser(String statement) { + return new AQLParser(statement); + } + private static void checkBindingVariable(Expression returnExpression, VariableExpr var, ILangExpression bodyExpression) throws ParseException { if (returnExpression != null && var == null) { @@ -2419,7 +2424,7 @@ Clause OrderbyClause()throws ParseException : } if (hint.startsWith(RANGE_HINT)) { try{ - oc.setRangeMap(RangeMapBuilder.parseHint(hint.substring(RANGE_HINT.length()))); + oc.setRangeMap(RangeMapBuilder.parseHint(createNewParser(hint.substring(RANGE_HINT.length())))); } catch (CompilationException e) { throw new ParseException(e.getMessage()); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/asterixdb/asterix-lang-common/pom.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-common/pom.xml b/asterixdb/asterix-lang-common/pom.xml index 7d68b8b..dde41e0 100644 --- a/asterixdb/asterix-lang-common/pom.xml +++ b/asterixdb/asterix-lang-common/pom.xml @@ -103,5 +103,9 @@ <groupId>org.apache.hyracks</groupId> <artifactId>hyracks-api</artifactId> </dependency> + <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-data-std</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/clause/OrderbyClause.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/clause/OrderbyClause.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/clause/OrderbyClause.java index faefe1d..af8c725 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/clause/OrderbyClause.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/clause/OrderbyClause.java @@ -25,12 +25,12 @@ import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.lang.common.base.AbstractClause; import org.apache.asterix.lang.common.base.Expression; import org.apache.asterix.lang.common.visitor.base.ILangVisitor; -import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap; +import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap; public class OrderbyClause extends AbstractClause { private List<Expression> orderbyList; private List<OrderModifier> modifierList; - private IRangeMap rangeMap; + private RangeMap rangeMap; // can be null private int numFrames = -1; private int numTuples = -1; @@ -90,17 +90,17 @@ public class OrderbyClause extends AbstractClause { this.numTuples = numTuples; } - public IRangeMap getRangeMap() { + public RangeMap getRangeMap() { return rangeMap; } - public void setRangeMap(IRangeMap rangeMap) { + public void setRangeMap(RangeMap rangeMap) { this.rangeMap = rangeMap; } @Override public int hashCode() { - return Objects.hash(modifierList, numFrames, numTuples, orderbyList); + return Objects.hash(modifierList, numFrames, numTuples, orderbyList, rangeMap); } @Override @@ -113,6 +113,7 @@ public class OrderbyClause extends AbstractClause { } OrderbyClause target = (OrderbyClause) object; return Objects.equals(modifierList, target.modifierList) && numFrames == target.numFrames - && numTuples == target.numTuples && orderbyList.equals(target.orderbyList); + && numTuples == target.numTuples && orderbyList.equals(target.orderbyList) + && Objects.equals(rangeMap, target.rangeMap); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/RangeMapBuilder.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/RangeMapBuilder.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/RangeMapBuilder.java new file mode 100644 index 0000000..c505c1c --- /dev/null +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/RangeMapBuilder.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.lang.common.util; + +import java.io.DataOutput; +import java.util.List; + +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider; +import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; +import org.apache.asterix.lang.common.base.Expression; +import org.apache.asterix.lang.common.base.Expression.Kind; +import org.apache.asterix.lang.common.base.IParser; +import org.apache.asterix.lang.common.base.Literal; +import org.apache.asterix.lang.common.base.Statement; +import org.apache.asterix.lang.common.expression.ListConstructor; +import org.apache.asterix.lang.common.expression.LiteralExpr; +import org.apache.asterix.lang.common.literal.DoubleLiteral; +import org.apache.asterix.lang.common.literal.FloatLiteral; +import org.apache.asterix.lang.common.literal.IntegerLiteral; +import org.apache.asterix.lang.common.literal.LongIntegerLiteral; +import org.apache.asterix.lang.common.literal.StringLiteral; +import org.apache.asterix.lang.common.statement.Query; +import org.apache.asterix.om.base.AMutableDouble; +import org.apache.asterix.om.base.AMutableFloat; +import org.apache.asterix.om.base.AMutableInt32; +import org.apache.asterix.om.base.AMutableInt64; +import org.apache.asterix.om.base.AMutableString; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.types.BuiltinType; +import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException; +import org.apache.hyracks.api.dataflow.value.IBinaryComparator; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap; + +public class RangeMapBuilder { + + private RangeMapBuilder() { + } + + public static RangeMap parseHint(IParser parser) throws CompilationException { + ArrayBackedValueStorage abvs = new ArrayBackedValueStorage(); + DataOutput out = abvs.getDataOutput(); + abvs.reset(); + + List<Statement> hintStatements = parser.parse(); + if (hintStatements.size() != 1) { + throw new CompilationException("Only one range statement is allowed for the range hint."); + } + + // Translate the query into a Range Map + if (hintStatements.get(0).getKind() != Statement.Kind.QUERY) { + throw new CompilationException("Not a proper query for the range hint."); + } + Query q = (Query) hintStatements.get(0); + + if (q.getBody().getKind() != Kind.LIST_CONSTRUCTOR_EXPRESSION) { + throw new CompilationException("The range hint must be a list."); + } + List<Expression> el = ((ListConstructor) q.getBody()).getExprList(); + int[] offsets = new int[el.size()]; + + // Loop over list of literals + for (int i = 0; i < el.size(); ++i) { + Expression item = el.get(i); + if (item.getKind() == Kind.LITERAL_EXPRESSION) { + parseLiteralToBytes(item, out); + offsets[i] = abvs.getLength(); + } + // TODO Add support for composite fields. + } + + return new RangeMap(1, abvs.getByteArray(), offsets); + } + + @SuppressWarnings("unchecked") + private static void parseLiteralToBytes(Expression item, DataOutput out) throws CompilationException { + AMutableDouble aDouble = new AMutableDouble(0); + AMutableFloat aFloat = new AMutableFloat(0); + AMutableInt64 aInt64 = new AMutableInt64(0); + AMutableInt32 aInt32 = new AMutableInt32(0); + AMutableString aString = new AMutableString(""); + @SuppressWarnings("rawtypes") + ISerializerDeserializer serde; + + Literal l = ((LiteralExpr) item).getValue(); + try { + switch (l.getLiteralType()) { + case DOUBLE: + DoubleLiteral dl = (DoubleLiteral) l; + serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE); + aDouble.setValue(dl.getValue()); + serde.serialize(aDouble, out); + break; + case FLOAT: + FloatLiteral fl = (FloatLiteral) l; + serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT); + aFloat.setValue(fl.getValue()); + serde.serialize(aFloat, out); + break; + case INTEGER: + IntegerLiteral il = (IntegerLiteral) l; + serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32); + aInt32.setValue(il.getValue()); + serde.serialize(aInt32, out); + break; + case LONG: + LongIntegerLiteral lil = (LongIntegerLiteral) l; + serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64); + aInt64.setValue(lil.getValue()); + serde.serialize(aInt64, out); + break; + case STRING: + StringLiteral sl = (StringLiteral) l; + serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING); + aString.setValue(sl.getValue()); + serde.serialize(aString, out); + break; + default: + throw new NotImplementedException("The range map builder has not been implemented for " + + item.getKind() + " type of expressions."); + } + } catch (HyracksDataException e) { + throw new CompilationException(ErrorCode.RANGE_MAP_ERROR, e, item.getSourceLocation(), e.getMessage()); + } + } + + public static void verifyRangeOrder(RangeMap rangeMap, boolean ascending) throws CompilationException { + // TODO Add support for composite fields. + int fieldIndex = 0; + int fieldType = rangeMap.getTag(0, 0); + BinaryComparatorFactoryProvider comparatorFactory = BinaryComparatorFactoryProvider.INSTANCE; + IBinaryComparatorFactory bcf = + comparatorFactory.getBinaryComparatorFactory(ATypeTag.VALUE_TYPE_MAPPING[fieldType], ascending); + IBinaryComparator comparator = bcf.createBinaryComparator(); + int c = 0; + for (int split = 1; split < rangeMap.getSplitCount(); ++split) { + if (fieldType != rangeMap.getTag(fieldIndex, split)) { + throw new CompilationException("Range field contains more than a single type of items (" + fieldType + + " and " + rangeMap.getTag(fieldIndex, split) + ")."); + } + int previousSplit = split - 1; + try { + c = comparator.compare(rangeMap.getByteArray(), rangeMap.getStartOffset(fieldIndex, previousSplit), + rangeMap.getLength(fieldIndex, previousSplit), rangeMap.getByteArray(), + rangeMap.getStartOffset(fieldIndex, split), rangeMap.getLength(fieldIndex, split)); + } catch (HyracksDataException e) { + throw new CompilationException(e); + } + if (c >= 0) { + throw new CompilationException("Range fields are not in sorted order."); + } + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj index e2a8759..13d1f8d 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj +++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj @@ -73,6 +73,7 @@ import org.apache.asterix.lang.common.base.Expression; import org.apache.asterix.lang.common.base.Literal; import org.apache.asterix.lang.common.base.IParser; import org.apache.asterix.lang.common.base.ILangExpression; +import org.apache.asterix.lang.common.base.IParserFactory; import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.lang.common.clause.GroupbyClause; import org.apache.asterix.lang.common.clause.LetClause; @@ -149,6 +150,7 @@ import org.apache.asterix.lang.common.struct.Identifier; import org.apache.asterix.lang.common.struct.OperatorType; import org.apache.asterix.lang.common.struct.QuantifiedPair; import org.apache.asterix.lang.common.struct.VarIdentifier; +import org.apache.asterix.lang.common.util.RangeMapBuilder; import org.apache.asterix.lang.sqlpp.clause.AbstractBinaryCorrelateClause; import org.apache.asterix.lang.sqlpp.clause.FromClause; import org.apache.asterix.lang.sqlpp.clause.FromTerm; @@ -200,7 +202,6 @@ class SQLPPParser extends ScopeChecker implements IParser { private static final String SKIP_SECONDARY_INDEX_SEARCH_HINT = "skip-index"; private static final String VAL_FILE_HINT = "val-files"; private static final String VAL_FILE_SAME_INDEX_HINT = "val-file-same-idx"; - private static final String GEN_FIELDS_HINT = "gen-fields"; // data generator hints @@ -241,6 +242,10 @@ class SQLPPParser extends ScopeChecker implements IParser { return s.substring(1).trim(); } + private static IParser createNewParser(String statement) { + return new SQLPPParser(statement); + } + private Token getHintToken(Token t) { return t.specialToken; } @@ -3062,6 +3067,13 @@ OrderbyClause OrderbyClause() throws ParseException : oc.setNumFrames(numFrames); oc.setNumTuples(numTuples); } + if (hint.startsWith(RANGE_HINT)) { + try { + oc.setRangeMap(RangeMapBuilder.parseHint(createNewParser(hint.substring(RANGE_HINT.length())))); + } catch (CompilationException e) { + throw new SqlppParseException(getSourceLocation(getHintToken(token)), e.getMessage()); + } + } } } <BY> orderbyExpr = Expression() http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java index 910c159..1ff6e57 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java @@ -103,6 +103,7 @@ import org.apache.asterix.om.typecomputer.impl.OrderedListOfAIntervalTypeCompute import org.apache.asterix.om.typecomputer.impl.OrderedListOfAPointTypeComputer; import org.apache.asterix.om.typecomputer.impl.OrderedListOfAStringTypeComputer; import org.apache.asterix.om.typecomputer.impl.OrderedListOfAnyTypeComputer; +import org.apache.asterix.om.typecomputer.impl.ListOfSamplesTypeComputer; import org.apache.asterix.om.typecomputer.impl.PropagateTypeComputer; import org.apache.asterix.om.typecomputer.impl.RecordAddFieldsTypeComputer; import org.apache.asterix.om.typecomputer.impl.RecordMergeTypeComputer; @@ -477,6 +478,10 @@ public class BuiltinFunctions { new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-intermediate-stddev", 1); public static final FunctionIdentifier LOCAL_STDDEV = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-stddev", 1); + public static final FunctionIdentifier LOCAL_SAMPLING = + new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-sampling", FunctionIdentifier.VARARGS); + public static final FunctionIdentifier RANGE_MAP = + new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-range-map", 1); public static final FunctionIdentifier SCALAR_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "avg", 1); public static final FunctionIdentifier SCALAR_COUNT = @@ -1381,6 +1386,8 @@ public class BuiltinFunctions { addPrivateFunction(LOCAL_STDDEV, LocalSingleVarStatisticsTypeComputer.INSTANCE, true); addFunction(STDDEV, NullableDoubleTypeComputer.INSTANCE, true); addPrivateFunction(GLOBAL_STDDEV, NullableDoubleTypeComputer.INSTANCE, true); + addPrivateFunction(LOCAL_SAMPLING, ListOfSamplesTypeComputer.INSTANCE, true); + addPrivateFunction(RANGE_MAP, ABinaryTypeComputer.INSTANCE, true); addPrivateFunction(SERIAL_SQL_AVG, NullableDoubleTypeComputer.INSTANCE, true); addPrivateFunction(SERIAL_SQL_COUNT, AInt64TypeComputer.INSTANCE, true); @@ -1832,6 +1839,14 @@ public class BuiltinFunctions { addScalarAgg(FIRST_ELEMENT, SCALAR_FIRST_ELEMENT); + // RANGE_MAP + addAgg(RANGE_MAP); + addAgg(LOCAL_SAMPLING); + addLocalAgg(RANGE_MAP, LOCAL_SAMPLING); + addIntermediateAgg(LOCAL_SAMPLING, RANGE_MAP); + addIntermediateAgg(RANGE_MAP, RANGE_MAP); + addGlobalAgg(RANGE_MAP, RANGE_MAP); + // MIN addAgg(MIN); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ListOfSamplesTypeComputer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ListOfSamplesTypeComputer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ListOfSamplesTypeComputer.java new file mode 100644 index 0000000..1ae72e4 --- /dev/null +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ListOfSamplesTypeComputer.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.om.typecomputer.impl; + +import org.apache.asterix.om.pointables.base.DefaultOpenFieldType; +import org.apache.asterix.om.typecomputer.base.AbstractResultTypeComputer; +import org.apache.asterix.om.types.AOrderedListType; +import org.apache.asterix.om.types.IAType; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; + +/** + * List of samples type: [[ANY], [ANY],...]. Each inner list constitutes one sample. Inside the inner list (the sample), + * each item (or field) has its type tag. + */ +public class ListOfSamplesTypeComputer extends AbstractResultTypeComputer { + + public static final AOrderedListType TYPE = + new AOrderedListType(DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE, null); + public static final ListOfSamplesTypeComputer INSTANCE = new ListOfSamplesTypeComputer(); + + private ListOfSamplesTypeComputer() { + } + + @Override + protected IAType getResultType(ILogicalExpression expr, IAType... strippedInputTypes) throws AlgebricksException { + return TYPE; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java new file mode 100644 index 0000000..55d381d --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.std; + +import java.io.IOException; + +import org.apache.asterix.builders.IAsterixListBuilder; +import org.apache.asterix.builders.OrderedListBuilder; +import org.apache.asterix.common.config.CompilerProperties; +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptor; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.om.functions.IFunctionTypeInferer; +import org.apache.asterix.om.typecomputer.impl.ListOfSamplesTypeComputer; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.types.AbstractCollectionType; +import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor; +import org.apache.asterix.runtime.functions.FunctionTypeInferers; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.VoidPointable; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +public class LocalSamplingAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor { + private static final long serialVersionUID = 1L; + private int numSamples; + + public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { + @Override + public IFunctionDescriptor createFunctionDescriptor() { + return new LocalSamplingAggregateDescriptor(); + } + + @Override + public IFunctionTypeInferer createFunctionTypeInferer() { + return FunctionTypeInferers.SET_NUM_SAMPLES; + } + }; + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.LOCAL_SAMPLING; + } + + @Override + public void setImmutableStates(Object... states) { + numSamples = (int) states[0]; + } + + @Override + public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) { + return new IAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IAggregateEvaluator createAggregateEvaluator(final IHyracksTaskContext ctx) + throws HyracksDataException { + return new LocalSamplingAggregateFunction(args, ctx, numSamples); + } + }; + } + + private class LocalSamplingAggregateFunction implements IAggregateEvaluator { + private final int numSamplesRequired; + private final ArrayBackedValueStorage storage; + private final IAsterixListBuilder listOfSamplesBuilder; + private final IAsterixListBuilder oneSampleBuilder; + private final IScalarEvaluator[] sampledFieldsEval; + private final IPointable inputFieldValue; + private int numSamplesTaken; + + /** + * @param args the fields that constitute a sample, e.g., $$1, $$2 + * @param context Hyracks task + * @throws HyracksDataException + */ + private LocalSamplingAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, + int numSamplesRequired) throws HyracksDataException { + storage = new ArrayBackedValueStorage(); + inputFieldValue = new VoidPointable(); + sampledFieldsEval = new IScalarEvaluator[args.length]; + for (int i = 0; i < args.length; i++) { + sampledFieldsEval[i] = args[i].createScalarEvaluator(context); + } + oneSampleBuilder = new OrderedListBuilder(); + listOfSamplesBuilder = new OrderedListBuilder(); + listOfSamplesBuilder.reset(ListOfSamplesTypeComputer.TYPE); + this.numSamplesRequired = numSamplesRequired > 0 ? numSamplesRequired + : (int) CompilerProperties.Option.COMPILER_SORT_SAMPLES.defaultValue(); + } + + @Override + public void init() throws HyracksDataException { + numSamplesTaken = 0; + listOfSamplesBuilder.reset(ListOfSamplesTypeComputer.TYPE); + } + + /** + * Receives data stream one tuple at a time from a data source and records samples. + * @param tuple one sample + * @throws HyracksDataException + */ + @Override + public void step(IFrameTupleReference tuple) throws HyracksDataException { + if (numSamplesTaken >= numSamplesRequired) { + return; + } + // start over for a new sample + oneSampleBuilder.reset((AbstractCollectionType) ListOfSamplesTypeComputer.TYPE.getItemType()); + + for (IScalarEvaluator fieldEval : sampledFieldsEval) { + // add fields to make up one sample + fieldEval.evaluate(tuple, inputFieldValue); + oneSampleBuilder.addItem(inputFieldValue); + } + // prepare the sample to add it to the list of samples + storage.reset(); + oneSampleBuilder.write(storage.getDataOutput(), true); + listOfSamplesBuilder.addItem(storage); + numSamplesTaken++; + } + + /** + * Sends the list of samples to the global range-map generator. + * @param result list of samples + * @throws HyracksDataException + */ + @Override + public void finish(IPointable result) throws HyracksDataException { + storage.reset(); + if (numSamplesTaken == 0) { + // empty partition? then send system null as an indication of empty partition. + try { + storage.getDataOutput().writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG); + result.set(storage); + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } else { + listOfSamplesBuilder.write(storage.getDataOutput(), true); + result.set(storage); + } + } + + @Override + public void finishPartial(IPointable result) throws HyracksDataException { + finish(result); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java new file mode 100644 index 0000000..c967a94 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.aggregates.std; + +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory; +import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectDescBinaryComparatorFactory; +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptor; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.om.functions.IFunctionTypeInferer; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor; +import org.apache.asterix.runtime.evaluators.common.ListAccessor; +import org.apache.asterix.runtime.functions.FunctionTypeInferers; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IBinaryComparator; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.VoidPointable; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; +import org.apache.hyracks.dataflow.common.data.marshalling.ByteArraySerializerDeserializer; +import org.apache.hyracks.dataflow.common.data.marshalling.IntArraySerializerDeserializer; +import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer; + +/** + * <pre> + * {@see {@link org.apache.hyracks.dataflow.common.data.partition.range.RangeMap}} for some description of the range map + * structure that is produced by this function. Given a list of samples and a number of partitions "k", the algorithm + * of this function operates as follows (s = sample): + * It picks (k - 1) split points out of the samples by dividing num_samples/num_partitions. For 4 partitions, it's 3: + * s0,s1,s2,s3,s4,s5,s6,s7,s8,s9,s10,s11,s12,s13,s14,s15; 16/4 = 4; range map = [s3, s7, s11] + * | | | + * + * s0,s1,s2,s3,s4,s5,s6; 7/4 = 2; range map = [s1, s3, s5] + * | | | + * + * s0,s1,s2,s3,s4; 5/4 = 2; range map = [s1, s3, s4]; if we go out of bound for the last split, we pick the last item. + * | | | + * + * s0,s1,s2,s3; if #_samples <= #_partitions, we sweep from the beginning (should be rare). range map = [s0, s1, s2] + * | | | + * + * s0,s1; if there are way less samples, we sweep and repeat the last item; range map = [s0, s1, s1]; + * Note: a sample (and therefore also a split point) could be single-column or multi-column. + * </pre> + */ +public class RangeMapAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor { + private static final long serialVersionUID = 1L; + private boolean[] ascendingFlags; + private int numOfPartitions; + private int numOrderFields; + + public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { + @Override + public IFunctionDescriptor createFunctionDescriptor() { + return new RangeMapAggregateDescriptor(); + } + + @Override + public IFunctionTypeInferer createFunctionTypeInferer() { + return FunctionTypeInferers.SET_SORTING_PARAMETERS; + } + }; + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.RANGE_MAP; + } + + /** + * The sampling function, which generates the splitting vector, needs to know the number of partitions in order to + * determine how many split points to pick out of the samples. It also needs to know the ascending/descending of + * each sort field so that it can sort the samples accordingly first and then pick the (number of partitions - 1) + * split points out of the sorted samples. + * @param states states[0]: number of partitions, states[1]: ascending flags + */ + @Override + public void setImmutableStates(Object... states) { + numOfPartitions = (int) states[0]; + ascendingFlags = (boolean[]) states[1]; + numOrderFields = ascendingFlags.length; + } + + @Override + public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args) { + return new IAggregateEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IAggregateEvaluator createAggregateEvaluator(final IHyracksTaskContext ctx) + throws HyracksDataException { + return new GlobalSamplingAggregateFunction(args, ctx, ascendingFlags, numOfPartitions, numOrderFields); + } + }; + } + + private class GlobalSamplingAggregateFunction implements IAggregateEvaluator { + private final IScalarEvaluator localSamplesEval; + private final IPointable localSamples; + private final List<List<byte[]>> finalSamples; + private final Comparator<List<byte[]>> comparator; + private final int numOfPartitions; + private final int numOrderByFields; + private final ListAccessor listOfSamples; + private final ListAccessor oneSample; + private final IPointable oneSamplePointable; + private final ArrayBackedValueStorage oneSampleStorage; + private final IPointable field; + private final ArrayBackedValueStorage storage; + + @SuppressWarnings("unchecked") + private GlobalSamplingAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, + boolean[] ascending, int numOfPartitions, int numOrderByFields) throws HyracksDataException { + localSamples = new VoidPointable(); + localSamplesEval = args[0].createScalarEvaluator(context); + finalSamples = new ArrayList<>(); + comparator = createComparator(ascending); + this.numOfPartitions = numOfPartitions; + this.numOrderByFields = numOrderByFields; + listOfSamples = new ListAccessor(); + oneSample = new ListAccessor(); + oneSamplePointable = new VoidPointable(); + oneSampleStorage = new ArrayBackedValueStorage(); + field = new VoidPointable(); + storage = new ArrayBackedValueStorage(); + } + + @Override + public void init() throws HyracksDataException { + finalSamples.clear(); + } + + /** + * Receives the local samples and appends them to the final list of samples. + * @param tuple the partition's samples + * @throws HyracksDataException + */ + @Override + public void step(IFrameTupleReference tuple) throws HyracksDataException { + // check if empty stream (system_null), i.e. partition is empty, so no samples + localSamplesEval.evaluate(tuple, localSamples); + byte tag = localSamples.getByteArray()[localSamples.getStartOffset()]; + if (tag == ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG) { + return; + } + // deserialize the samples received from the local partition + listOfSamples.reset(localSamples.getByteArray(), localSamples.getStartOffset()); + int numberOfSamples = listOfSamples.size(); + + // "sample" & "addedSample" are lists to support multi-column instead of one value, i.e. <3,"dept"> + List<byte[]> addedSample; + int numberOfFields; + // add the samples to the final samples + try { + for (int i = 0; i < numberOfSamples; i++) { + oneSampleStorage.reset(); + listOfSamples.getOrWriteItem(i, oneSamplePointable, oneSampleStorage); + oneSample.reset(oneSamplePointable.getByteArray(), oneSamplePointable.getStartOffset()); + numberOfFields = oneSample.size(); + addedSample = new ArrayList<>(numberOfFields); + for (int j = 0; j < numberOfFields; j++) { + storage.reset(); + oneSample.getOrWriteItem(j, field, storage); + addedSample.add(Arrays.copyOfRange(field.getByteArray(), field.getStartOffset(), + field.getStartOffset() + field.getLength())); + } + finalSamples.add(addedSample); + } + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } + + /** + * Produces the range map out of the collected samples from each partition. The final list of samples is sorted + * first. Then, we select the split points by dividing the samples evenly. + * @param result contains the serialized range map. + * @throws HyracksDataException + */ + @Override + public void finish(IPointable result) throws HyracksDataException { + // storage == all serialized split values of all split points + storage.reset(); + DataOutput allSplitValuesOut = storage.getDataOutput(); + int[] endOffsets; + try { + // check if empty dataset, i.e. no samples have been received from any partition + if (finalSamples.isEmpty()) { + // a range map with null values + endOffsets = new int[numOrderByFields]; + for (int sortField = 0; sortField < numOrderByFields; sortField++) { + allSplitValuesOut.write(ATypeTag.SERIALIZED_NULL_TYPE_TAG); + endOffsets[sortField] = storage.getLength(); + } + } else { + finalSamples.sort(comparator); + // divide the samples evenly and pick the boundaries as split points + int nextSplitOffset = (int) Math.ceil(finalSamples.size() / (double) numOfPartitions); + int nextSplitIndex = nextSplitOffset - 1; + int endOffsetsCounter = 0; + int numRequiredSplits = numOfPartitions - 1; + endOffsets = new int[numRequiredSplits * numOrderByFields]; + List<byte[]> sample; + for (int split = 1; split <= numRequiredSplits; split++) { + // pick the split point from sorted samples (could be <3> or <4,"John"> if it's multi-column) + sample = finalSamples.get(nextSplitIndex); + for (int column = 0; column < sample.size(); column++) { + allSplitValuesOut.write(sample.get(column)); + endOffsets[endOffsetsCounter++] = storage.getLength(); + } + // go to the next split point + nextSplitIndex += nextSplitOffset; + // in case we go beyond the boundary of samples, we pick the last sample repeatedly + if (nextSplitIndex >= finalSamples.size()) { + nextSplitIndex = finalSamples.size() - 1; + } + } + } + } catch (IOException e) { + throw HyracksDataException.create(e); + } + + serializeRangemap(numOrderByFields, storage.getByteArray(), endOffsets, result); + } + + @Override + public void finishPartial(IPointable result) throws HyracksDataException { + finish(result); + } + + /** + * Creates the comparator that sorts all the collected samples before picking split points. + * @param ascending ascending or descending flag for each sort column. + * @return the described comparator + */ + private Comparator<List<byte[]>> createComparator(boolean[] ascending) { + // create the generic comparator for each sort field + IBinaryComparator[] fieldsComparators = new IBinaryComparator[ascending.length]; + for (int i = 0; i < ascending.length; i++) { + if (ascending[i]) { + fieldsComparators[i] = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator(); + } else { + fieldsComparators[i] = AObjectDescBinaryComparatorFactory.INSTANCE.createBinaryComparator(); + } + } + + return (splitPoint1, splitPoint2) -> { + try { + // two split points must have the same num of fields + int numFields = splitPoint1.size(); + int result = 0; + byte[] field1; + byte[] field2; + for (int i = 0; i < numFields; i++) { + field1 = splitPoint1.get(i); + field2 = splitPoint2.get(i); + result = fieldsComparators[i].compare(field1, 0, field1.length, field2, 0, field2.length); + if (result != 0) { + return result; + } + } + return result; + } catch (HyracksDataException e) { + throw new IllegalStateException(e); + } + }; + } + + /** + * Serializes the range map object defined by the below attributes into the "result". The range map object is + * serialized as binary data. + * @param numberFields the number of order-by fields (the sort fields) + * @param splitValues the serialized split values stored one after the other + * @param endOffsets the end offsets of each split value + * @param result where the range map object is serialized + * @throws HyracksDataException + */ + private void serializeRangemap(int numberFields, byte[] splitValues, int[] endOffsets, IPointable result) + throws HyracksDataException { + ArrayBackedValueStorage serRangeMap = new ArrayBackedValueStorage(); + IntegerSerializerDeserializer.write(numberFields, serRangeMap.getDataOutput()); + ByteArraySerializerDeserializer.write(splitValues, serRangeMap.getDataOutput()); + IntArraySerializerDeserializer.write(endOffsets, serRangeMap.getDataOutput()); + + result.set(serRangeMap.getByteArray(), serRangeMap.getStartOffset(), serRangeMap.getLength()); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java index 9541d69..5affbca 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java @@ -68,6 +68,7 @@ import org.apache.asterix.runtime.aggregates.serializable.std.SerializableSumAgg import org.apache.asterix.runtime.aggregates.std.AvgAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.CountAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.GlobalAvgAggregateDescriptor; +import org.apache.asterix.runtime.aggregates.std.RangeMapAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.GlobalSqlAvgAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.GlobalSqlStddevAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.GlobalStddevAggregateDescriptor; @@ -78,6 +79,7 @@ import org.apache.asterix.runtime.aggregates.std.IntermediateStddevAggregateDesc import org.apache.asterix.runtime.aggregates.std.LocalAvgAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.LocalMaxAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.LocalMinAggregateDescriptor; +import org.apache.asterix.runtime.aggregates.std.LocalSamplingAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.LocalSqlAvgAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.LocalSqlMaxAggregateDescriptor; import org.apache.asterix.runtime.aggregates.std.LocalSqlMinAggregateDescriptor; @@ -458,6 +460,8 @@ public final class FunctionCollection implements IFunctionCollection { fc.add(LocalStddevAggregateDescriptor.FACTORY); fc.add(IntermediateStddevAggregateDescriptor.FACTORY); fc.add(GlobalStddevAggregateDescriptor.FACTORY); + fc.add(LocalSamplingAggregateDescriptor.FACTORY); + fc.add(RangeMapAggregateDescriptor.FACTORY); // serializable aggregates fc.add(SerializableCountAggregateDescriptor.FACTORY); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java index b9c58c7..44e3eb7 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionTypeInferers.java @@ -19,6 +19,9 @@ package org.apache.asterix.runtime.functions; +import java.util.ArrayList; +import java.util.List; + import org.apache.asterix.common.config.CompilerProperties; import org.apache.asterix.om.base.AOrderedList; import org.apache.asterix.om.base.AString; @@ -42,9 +45,6 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCa import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression; import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; -import java.util.ArrayList; -import java.util.List; - /** * Implementations of {@link IFunctionTypeInferer} for built-in functions */ @@ -95,6 +95,26 @@ public final class FunctionTypeInferers { } }; + public static final IFunctionTypeInferer SET_SORTING_PARAMETERS = new IFunctionTypeInferer() { + @Override + public void infer(ILogicalExpression expr, IFunctionDescriptor fd, IVariableTypeEnvironment context, + CompilerProperties compilerProps) throws AlgebricksException { + AbstractFunctionCallExpression funCallExpr = (AbstractFunctionCallExpression) expr; + Object[] sortingParameters = funCallExpr.getOpaqueParameters(); + fd.setImmutableStates(sortingParameters[0], sortingParameters[1]); + } + }; + + public static final IFunctionTypeInferer SET_NUM_SAMPLES = new IFunctionTypeInferer() { + @Override + public void infer(ILogicalExpression expr, IFunctionDescriptor fd, IVariableTypeEnvironment context, + CompilerProperties compilerProps) throws AlgebricksException { + AbstractFunctionCallExpression funCallExpr = (AbstractFunctionCallExpression) expr; + Object[] samplingParameters = funCallExpr.getOpaqueParameters(); + fd.setImmutableStates(samplingParameters[0]); + } + }; + public static final class CastTypeInferer implements IFunctionTypeInferer { @Override public void infer(ILogicalExpression expr, IFunctionDescriptor fd, IVariableTypeEnvironment context, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java index 2d13baf..3794328 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java @@ -28,6 +28,7 @@ public enum LogicalOperatorTag { EXCHANGE, DELEGATE_OPERATOR, EXTERNAL_LOOKUP, + FORWARD, GROUP, INDEX_INSERT_DELETE_UPSERT, INNERJOIN, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/OperatorAnnotations.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/OperatorAnnotations.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/OperatorAnnotations.java index fc702ce..cbe0882 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/OperatorAnnotations.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/OperatorAnnotations.java @@ -22,7 +22,8 @@ public interface OperatorAnnotations { // hints public static final String USE_HASH_GROUP_BY = "USE_HASH_GROUP_BY"; // --> public static final String USE_EXTERNAL_GROUP_BY = "USE_EXTERNAL_GROUP_BY"; // --> - public static final String USE_RANGE_CONNECTOR = "USE_RANGE_CONNECTOR"; // --> + public static final String USE_STATIC_RANGE = "USE_STATIC_RANGE"; // --> + public static final String USE_DYNAMIC_RANGE = "USE_DYNAMIC_RANGE"; // Boolean public static final String CARDINALITY = "CARDINALITY"; // --> // Integer http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java index db9728b..ac1de5a 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java @@ -30,6 +30,7 @@ public enum PhysicalOperatorTag { DELEGATE_OPERATOR, EXTERNAL_GROUP_BY, EXTERNAL_LOOKUP, + FORWARD, HASH_GROUP_BY, HASH_PARTITION_EXCHANGE, HASH_PARTITION_MERGE_EXCHANGE, @@ -54,6 +55,7 @@ public enum PhysicalOperatorTag { RANDOM_MERGE_EXCHANGE, RANGE_PARTITION_EXCHANGE, RANGE_PARTITION_MERGE_EXCHANGE, + SEQUENTIAL_MERGE_EXCHANGE, REPLICATE, RTREE_SEARCH, RUNNING_AGGREGATE, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AbstractFunctionCallExpression.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AbstractFunctionCallExpression.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AbstractFunctionCallExpression.java index 80d6f95..1cc206f 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AbstractFunctionCallExpression.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AbstractFunctionCallExpression.java @@ -19,6 +19,7 @@ package org.apache.hyracks.algebricks.core.algebra.expressions; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; @@ -205,19 +206,7 @@ public abstract class AbstractFunctionCallExpression extends AbstractLogicalExpr return false; } } - if (opaqueParameters != null) { - if (opaqueParameters.length != fce.opaqueParameters.length) { - return false; - } - for (int i = 0; i < opaqueParameters.length; i++) { - Object opaqueParameter = opaqueParameters[i]; - Object fceOpaqueParameter = fce.opaqueParameters[i]; - if (!opaqueParameter.equals(fceOpaqueParameter)) { - return false; - } - } - } - return true; + return Arrays.deepEquals(opaqueParameters, fce.opaqueParameters); } } @@ -228,9 +217,7 @@ public abstract class AbstractFunctionCallExpression extends AbstractLogicalExpr h = h * 41 + e.getValue().hashCode(); } if (opaqueParameters != null) { - for (int i = 0; i < opaqueParameters.length; i++) { - h = h * 31 + opaqueParameters[i].hashCode(); - } + h = h * 31 + Arrays.deepHashCode(opaqueParameters); } return h; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AggregateFunctionCallExpression.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AggregateFunctionCallExpression.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AggregateFunctionCallExpression.java index bdd820e..97c4252 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AggregateFunctionCallExpression.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AggregateFunctionCallExpression.java @@ -60,6 +60,8 @@ public class AggregateFunctionCallExpression extends AbstractFunctionCallExpress fun.setStepTwoAggregate(stepTwoAggregate); fun.setStepOneAggregate(stepOneAggregate); fun.setSourceLocation(sourceLoc); + // opaqueParameters are not really cloned + fun.setOpaqueParameters(getOpaqueParameters()); return fun; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java index 3bb0f47..62b6a2d 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; @@ -39,7 +40,7 @@ import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionRef public abstract class AbstractReplicateOperator extends AbstractLogicalOperator { private int outputArity; - protected boolean[] outputMaterializationFlags; + private boolean[] outputMaterializationFlags; private List<Mutable<ILogicalOperator>> outputs; public AbstractReplicateOperator(int outputArity) { @@ -95,6 +96,19 @@ public abstract class AbstractReplicateOperator extends AbstractLogicalOperator return outputs; } + public void setOutputs(List<Pair<Mutable<ILogicalOperator>, Boolean>> newOutputs) { + // shrinking or expanding num of outputs + if (outputMaterializationFlags.length != newOutputs.size()) { + outputMaterializationFlags = new boolean[newOutputs.size()]; + } + outputs.clear(); + for (int i = 0; i < newOutputs.size(); i++) { + outputs.add(newOutputs.get(i).first); + outputMaterializationFlags[i] = newOutputs.get(i).second; + } + outputArity = newOutputs.size(); + } + @Override public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException { return createPropagatingAllInputsTypeEnvironment(ctx);