http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemTableScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemTableScanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemTableScanner.java new file mode 100644 index 0000000..7bd6a70 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemTableScanner.java @@ -0,0 +1,128 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import com.google.common.base.Preconditions; +import org.apache.tajo.annotation.NotNull; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.storage.Scanner; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; + +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; + +class MemTableScanner implements Scanner { + Iterable<Tuple> iterable; + Iterator<Tuple> iterator; + long inputBytes; + + // for input stats + float scannerProgress; + int numRecords; + int totalRecords; + TableStats scannerTableStats; + + public MemTableScanner(@NotNull Collection<Tuple> iterable, long inputBytes) { + Preconditions.checkNotNull(iterable); + this.iterable = iterable; + totalRecords = iterable.size(); + this.inputBytes = inputBytes; + } + + @Override + public void init() throws IOException { + scannerProgress = 0.0f; + numRecords = 0; + + // it will be returned as the final stats + scannerTableStats = new TableStats(); + scannerTableStats.setNumBytes(inputBytes); + scannerTableStats.setReadBytes(inputBytes); + scannerTableStats.setNumRows(totalRecords); + + iterator = iterable.iterator(); + } + + @Override + public Tuple next() throws IOException { + if (iterator.hasNext()) { + numRecords++; + return new VTuple(iterator.next()); + } else { + return null; + } + } + + @Override + public void reset() throws IOException { + init(); + } + + @Override + public void close() throws IOException { + scannerProgress = 1.0f; + } + + @Override + public boolean isProjectable() { + return false; + } + + @Override + public void setTarget(Column[] targets) { + } + + @Override + public boolean isSelectable() { + return false; + } + + @Override + public void setSearchCondition(Object expr) { + } + + @Override + public boolean isSplittable() { + return false; + } + + @Override + public Schema getSchema() { + return null; + } + + @Override + public float getProgress() { + if (numRecords > 0) { + return (float)numRecords / (float)totalRecords; + + } else { // if an input is empty + return scannerProgress; + } + } + + @Override + public TableStats getInputStats() { + return scannerTableStats; + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java index e1cc6a8..bf95d19 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java @@ -25,9 +25,9 @@ import org.apache.tajo.engine.planner.PlannerUtil; import org.apache.tajo.engine.planner.Projector; import org.apache.tajo.engine.planner.logical.JoinNode; import org.apache.tajo.engine.utils.TupleUtil; +import org.apache.tajo.storage.BaseTupleComparator; import org.apache.tajo.storage.FrameTuple; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.VTuple; import org.apache.tajo.worker.TaskAttemptContext; @@ -52,7 +52,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec { private List<Tuple> rightTupleSlots; private JoinTupleComparator joincomparator = null; - private TupleComparator[] tupleComparator = null; + private BaseTupleComparator[] tupleComparator = null; private final static int INITIAL_TUPLE_SLOT = 10000; http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java index bbfe973..d68597d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java @@ -24,9 +24,9 @@ import org.apache.tajo.engine.eval.EvalNode; import org.apache.tajo.engine.planner.PlannerUtil; import org.apache.tajo.engine.planner.Projector; import org.apache.tajo.engine.planner.logical.JoinNode; +import org.apache.tajo.storage.BaseTupleComparator; import org.apache.tajo.storage.FrameTuple; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.VTuple; import org.apache.tajo.worker.TaskAttemptContext; @@ -53,7 +53,7 @@ public class MergeJoinExec extends BinaryPhysicalExec { private Iterator<Tuple> innerIterator; private JoinTupleComparator joincomparator = null; - private TupleComparator[] tupleComparator = null; + private BaseTupleComparator[] tupleComparator = null; private final static int INITIAL_TUPLE_SLOT = 10000; http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PairWiseMerger.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PairWiseMerger.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PairWiseMerger.java new file mode 100644 index 0000000..2ac8662 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PairWiseMerger.java @@ -0,0 +1,250 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.IOUtils; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.storage.Scanner; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; + +import java.io.IOException; +import java.util.Comparator; + +/** + * Two-way merger scanner that reads two input sources and outputs one output tuples sorted in some order. + */ +class PairWiseMerger implements Scanner { + private static final Log LOG = LogFactory.getLog(PairWiseMerger.class); + + private Scanner leftScan; + private Scanner rightScan; + + private VTuple outTuple; + private VTuple leftTuple; + private VTuple rightTuple; + + private final Schema schema; + private final Comparator<Tuple> comparator; + + private float mergerProgress; + private TableStats mergerInputStats; + + private static enum State { + NEW, + INITED, + CLOSED + } + + private State state = State.NEW; + + public PairWiseMerger(Schema schema, Scanner leftScanner, Scanner rightScanner, Comparator<Tuple> comparator) + throws IOException { + this.schema = schema; + this.leftScan = leftScanner; + this.rightScan = rightScanner; + this.comparator = comparator; + } + + private void setState(State state) { + this.state = state; + } + + @Override + public void init() throws IOException { + if (state == State.NEW) { + leftScan.init(); + rightScan.init(); + + prepareTuplesForFirstComparison(); + + mergerInputStats = new TableStats(); + mergerProgress = 0.0f; + + setState(State.INITED); + } else { + throw new IllegalStateException("Illegal State: init() is not allowed in " + state.name()); + } + } + + private void prepareTuplesForFirstComparison() throws IOException { + Tuple lt = leftScan.next(); + if (lt != null) { + leftTuple = new VTuple(lt); + } else { + leftTuple = null; // TODO - missed free + } + + Tuple rt = rightScan.next(); + if (rt != null) { + rightTuple = new VTuple(rt); + } else { + rightTuple = null; // TODO - missed free + } + } + + public Tuple next() throws IOException { + + if (leftTuple != null && rightTuple != null) { + if (comparator.compare(leftTuple, rightTuple) < 0) { + outTuple = new VTuple(leftTuple); + + Tuple lt = leftScan.next(); + if (lt != null) { + leftTuple = new VTuple(lt); + } else { + leftTuple = null; // TODO - missed free + } + } else { + outTuple = new VTuple(rightTuple); + + Tuple rt = rightScan.next(); + if (rt != null) { + rightTuple = new VTuple(rt); + } else { + rightTuple = null; // TODO - missed free + } + } + return outTuple; + } + + if (leftTuple == null) { + if (rightTuple != null) { + outTuple = new VTuple(rightTuple); + } else { + outTuple = null; + } + + Tuple rt = rightScan.next(); + if (rt != null) { + rightTuple = new VTuple(rt); + } else { + rightTuple = null; // TODO - missed free + } + } else { + if (leftTuple != null) { + outTuple = new VTuple(leftTuple); + } else { + outTuple = null; + } + + Tuple lt = leftScan.next(); + if (lt != null) { + leftTuple = new VTuple(lt); + } else { + leftTuple = null; // TODO - missed free + } + } + return outTuple; + } + + @Override + public void reset() throws IOException { + if (state == State.INITED) { + leftScan.reset(); + rightScan.reset(); + + outTuple = null; + leftTuple = null; + rightTuple = null; + + prepareTuplesForFirstComparison(); + } else { + throw new IllegalStateException("Illegal State: init() is not allowed in " + state.name()); + } + } + + public void close() throws IOException { + IOUtils.cleanup(PairWiseMerger.LOG, leftScan, rightScan); + getInputStats(); + leftScan = null; + rightScan = null; + mergerProgress = 1.0f; + setState(State.CLOSED); + } + + @Override + public boolean isProjectable() { + return false; + } + + @Override + public void setTarget(Column[] targets) { + } + + @Override + public boolean isSelectable() { + return false; + } + + @Override + public void setSearchCondition(Object expr) { + } + + @Override + public boolean isSplittable() { + return false; + } + + @Override + public Schema getSchema() { + return schema; + } + + @Override + public float getProgress() { + if (leftScan == null) { + return mergerProgress; + } + return leftScan.getProgress() * 0.5f + rightScan.getProgress() * 0.5f; + } + + @Override + public TableStats getInputStats() { + if (leftScan == null) { + return mergerInputStats; + } + TableStats leftInputStats = leftScan.getInputStats(); + if (mergerInputStats == null) { + mergerInputStats = new TableStats(); + } + mergerInputStats.setNumBytes(0); + mergerInputStats.setReadBytes(0); + mergerInputStats.setNumRows(0); + + if (leftInputStats != null) { + mergerInputStats.setNumBytes(leftInputStats.getNumBytes()); + mergerInputStats.setReadBytes(leftInputStats.getReadBytes()); + mergerInputStats.setNumRows(leftInputStats.getNumRows()); + } + + TableStats rightInputStats = rightScan.getInputStats(); + if (rightInputStats != null) { + mergerInputStats.setNumBytes(mergerInputStats.getNumBytes() + rightInputStats.getNumBytes()); + mergerInputStats.setReadBytes(mergerInputStats.getReadBytes() + rightInputStats.getReadBytes()); + mergerInputStats.setNumRows(mergerInputStats.getNumRows() + rightInputStats.getNumRows()); + } + + return mergerInputStats; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java index 68379d1..786c726 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java @@ -47,7 +47,7 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec { private Schema keySchema; private BSTIndex.BSTIndexWriter indexWriter; - private TupleComparator comp; + private BaseTupleComparator comp; private FileAppender appender; private TableMeta meta; @@ -71,7 +71,7 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec { } BSTIndex bst = new BSTIndex(new TajoConf()); - this.comp = new TupleComparator(keySchema, sortSpecs); + this.comp = new BaseTupleComparator(keySchema, sortSpecs); Path storeTablePath = new Path(context.getWorkDir(), "output"); LOG.info("Output data directory: " + storeTablePath); this.meta = CatalogUtil.newTableMeta(context.getDataChannel() != null ? http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java index 5d4dad5..bc945de 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java @@ -26,9 +26,9 @@ import org.apache.tajo.engine.planner.PlannerUtil; import org.apache.tajo.engine.planner.Projector; import org.apache.tajo.engine.planner.logical.JoinNode; import org.apache.tajo.engine.utils.TupleUtil; +import org.apache.tajo.storage.BaseTupleComparator; import org.apache.tajo.storage.FrameTuple; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.VTuple; import org.apache.tajo.worker.TaskAttemptContext; @@ -52,7 +52,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec { private List<Tuple> innerTupleSlots; private JoinTupleComparator joinComparator = null; - private TupleComparator[] tupleComparator = null; + private BaseTupleComparator[] tupleComparator = null; private final static int INITIAL_TUPLE_SLOT = 10000; http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index 2f0c12f..122d4f3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -34,7 +34,7 @@ import org.apache.tajo.engine.eval.FieldEval; import org.apache.tajo.engine.planner.Projector; import org.apache.tajo.engine.planner.Target; import org.apache.tajo.engine.planner.logical.ScanNode; -import org.apache.tajo.engine.utils.SchemaUtil; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.engine.utils.TupleCache; import org.apache.tajo.engine.utils.TupleCacheKey; import org.apache.tajo.engine.utils.TupleUtil; http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java index a4a8d37..e261e0c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java @@ -18,11 +18,12 @@ package org.apache.tajo.engine.planner.physical; +import org.apache.tajo.SessionVars; +import org.apache.tajo.storage.BaseTupleComparator; import org.apache.tajo.worker.TaskAttemptContext; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.TupleComparator; import java.io.IOException; import java.util.Comparator; @@ -35,7 +36,13 @@ public abstract class SortExec extends UnaryPhysicalExec { Schema outSchema, PhysicalExec child, SortSpec [] sortSpecs) { super(context, inSchema, outSchema, child); this.sortSpecs = sortSpecs; - this.comparator = new TupleComparator(inSchema, sortSpecs); + + BaseTupleComparator comp = new BaseTupleComparator(inSchema, sortSpecs); + if (context.getQueryContext().getBool(SessionVars.CODEGEN)) { + this.comparator = context.getSharedResource().getCompiledComparator(inSchema, comp); + } else { + this.comparator = comp; + } } public SortSpec[] getSortSpecs() { http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java index fd0c04f..3199b56 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java @@ -68,10 +68,7 @@ public class StoreTableExec extends UnaryPhysicalExec { } PhysicalPlanUtil.setNullCharIfNecessary(context.getQueryContext(), plan, meta); - - if (context.getQueryContext().containsKey(SessionVars.MAX_OUTPUT_FILE_SIZE)) { - maxPerFileSize = context.getQueryContext().getLong(SessionVars.MAX_OUTPUT_FILE_SIZE) * StorageUnit.MB; - } + maxPerFileSize = context.getQueryContext().getLong(SessionVars.MAX_OUTPUT_FILE_SIZE) * StorageUnit.MB; openNewFile(writtenFileNum); sumStats = new TableStats(); http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java index 7aeed13..2945185 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java @@ -27,8 +27,8 @@ import org.apache.tajo.engine.eval.WindowFunctionEval; import org.apache.tajo.engine.function.FunctionContext; import org.apache.tajo.engine.planner.logical.WindowAggNode; import org.apache.tajo.engine.planner.logical.WindowSpec; +import org.apache.tajo.storage.BaseTupleComparator; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.VTuple; import org.apache.tajo.worker.TaskAttemptContext; @@ -265,7 +265,7 @@ public class WindowAggExec extends UnaryPhysicalExec { } private void evaluationWindowFrame() { - TupleComparator comp; + BaseTupleComparator comp; evaluatedTuples = new ArrayList<Tuple>(); @@ -285,9 +285,9 @@ public class WindowAggExec extends UnaryPhysicalExec { for (int idx = 0; idx < functions.length; idx++) { if (orderedFuncFlags[idx]) { - comp = new TupleComparator(inSchema, functions[idx].getSortSpecs()); + comp = new BaseTupleComparator(inSchema, functions[idx].getSortSpecs()); Collections.sort(accumulatedInTuples, comp); - comp = new TupleComparator(schemaForOrderBy, functions[idx].getSortSpecs()); + comp = new BaseTupleComparator(schemaForOrderBy, functions[idx].getSortSpecs()); Collections.sort(evaluatedTuples, comp); } http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java index ec5df04..7e26a22 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java @@ -29,7 +29,7 @@ import org.apache.tajo.engine.eval.*; import org.apache.tajo.engine.planner.*; import org.apache.tajo.engine.planner.LogicalPlan.QueryBlock; import org.apache.tajo.engine.planner.logical.*; -import org.apache.tajo.engine.utils.SchemaUtil; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.util.TUtil; import java.util.*; http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java deleted file mode 100644 index 981b572..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.engine.utils; - -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableDesc; - -public class SchemaUtil { - // See TAJO-914 bug. - // - // Its essential problem is that constant value is evaluated multiple times at each scan. - // As a result, join nodes can take the child nodes which have the same named fields. - // Because current schema does not allow the same name and ignore the duplicated schema, - // it finally causes the in-out schema mismatch between the parent and child nodes. - // - // tmpColumnSeq is a hack to avoid the above problem by keeping duplicated constant values as different name fields. - // The essential solution would be https://issues.apache.org/jira/browse/TAJO-895. - static int tmpColumnSeq = 0; - public static Schema merge(Schema left, Schema right) { - Schema merged = new Schema(); - for(Column col : left.getColumns()) { - if (!merged.containsByQualifiedName(col.getQualifiedName())) { - merged.addColumn(col); - } - } - for(Column col : right.getColumns()) { - if (merged.containsByQualifiedName(col.getQualifiedName())) { - merged.addColumn("?fake" + (tmpColumnSeq++), col.getDataType()); - } else { - merged.addColumn(col); - } - } - - // if overflow - if (tmpColumnSeq < 0) { - tmpColumnSeq = 0; - } - return merged; - } - - /** - * Get common columns to be used as join keys of natural joins. - */ - public static Schema getNaturalJoinColumns(Schema left, Schema right) { - Schema common = new Schema(); - for (Column outer : left.getColumns()) { - if (!common.containsByName(outer.getSimpleName()) && right.containsByName(outer.getSimpleName())) { - common.addColumn(new Column(outer.getSimpleName(), outer.getDataType())); - } - } - - return common; - } - - public static Schema getQualifiedLogicalSchema(TableDesc tableDesc, String tableName) { - Schema logicalSchema = new Schema(tableDesc.getLogicalSchema()); - if (tableName != null) { - logicalSchema.setQualifier(tableName); - } - return logicalSchema; - } - - public static <T extends Schema> T clone(Schema schema) { - try { - T copy = (T) schema.clone(); - return copy; - } catch (CloneNotSupportedException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java index e77e265..4b02777 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java @@ -30,6 +30,8 @@ import org.apache.tajo.engine.json.CoreGsonHelper; import org.apache.tajo.engine.planner.PlanningException; import org.apache.tajo.engine.planner.logical.LogicalNode; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.storage.BaseTupleComparator; +import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.util.Pair; import java.util.concurrent.atomic.AtomicBoolean; @@ -85,10 +87,10 @@ public class ExecutionBlockSharedResource { } public EvalNode compileEval(Schema schema, EvalNode eval) { - return compilationContext.getCompiler().compile(schema, eval); + return compilationContext.getEvalCompiler().compile(schema, eval); } - public EvalNode getPreCompiledEval(Schema schema, EvalNode eval) { + public EvalNode getCompiledEval(Schema schema, EvalNode eval) { if (codeGenEnabled) { Pair<Schema, EvalNode> key = new Pair<Schema, EvalNode>(schema, eval); @@ -96,7 +98,7 @@ public class ExecutionBlockSharedResource { return compilationContext.getPrecompiedEvals().get(key); } else { try { - LOG.warn(eval.toString() + " does not exists. Immediately compile it: " + eval); + LOG.warn(eval.toString() + " does not exist. Compiling it immediately."); return compileEval(schema, eval); } catch (Throwable t) { LOG.warn(t); @@ -104,10 +106,37 @@ public class ExecutionBlockSharedResource { } } } else { - throw new IllegalStateException("CodeGen is disabled"); + throw new IllegalStateException("CODEGEN is disabled"); } } + public TupleComparator compileComparator(Schema schema, BaseTupleComparator comp) { + return compilationContext.getComparatorCompiler().compile(comp, false); + } + + public TupleComparator getCompiledComparator(Schema schema, BaseTupleComparator comp) { + if (codeGenEnabled) { + Pair<Schema, BaseTupleComparator> key = new Pair<Schema, BaseTupleComparator>(schema, comp); + if (compilationContext.getPrecompiedComparators().containsKey(key)) { + return compilationContext.getPrecompiedComparators().get(key); + } else { + try { + LOG.warn(comp + " does not exist. Compiling it immediately"); + return compileComparator(schema, comp); + } catch (Throwable t) { + LOG.warn(t); + return comp; + } + } + } else { + throw new IllegalStateException("CODEGEN is disabled"); + } + } + + public TajoClassLoader getClassLoader() { + return classLoader; + } + public void release() { compilationContext = null; http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java b/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java index be33a12..2b58196 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java @@ -26,10 +26,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.Schema; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.storage.BaseTupleComparator; import org.apache.tajo.storage.RowStoreUtil; import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.index.bst.BSTIndex; import org.apache.tajo.worker.dataserver.retriever.FileChunk; import org.apache.tajo.worker.dataserver.retriever.RetrieverHandler; @@ -57,10 +57,10 @@ public class RangeRetrieverHandler implements RetrieverHandler { private final File file; private final BSTIndex.BSTIndexReader idxReader; private final Schema schema; - private final TupleComparator comp; + private final BaseTupleComparator comp; private final RowStoreDecoder decoder; - public RangeRetrieverHandler(File outDir, Schema schema, TupleComparator comp) throws IOException { + public RangeRetrieverHandler(File outDir, Schema schema, BaseTupleComparator comp) throws IOException { this.file = outDir; BSTIndex index = new BSTIndex(new TajoConf()); this.schema = schema; http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 7b4cbe1..d2cb60d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -49,8 +49,9 @@ import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol.*; import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType; import org.apache.tajo.rpc.NullCallback; +import org.apache.tajo.storage.BaseTupleComparator; +import org.apache.tajo.storage.RawFile; import org.apache.tajo.storage.StorageUtil; -import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.fragment.FileFragment; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; @@ -94,7 +95,9 @@ public class Task { // TODO - to be refactored private ShuffleType shuffleType = null; private Schema finalSchema = null; - private TupleComparator sortComp = null; + + private BaseTupleComparator sortComp = null; + private ClientSocketChannelFactory channelFactory = null; static final String OUTPUT_FILE_PREFIX="part-"; static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SUBQUERY = @@ -175,7 +178,7 @@ public class Task { if (shuffleType == ShuffleType.RANGE_SHUFFLE) { SortNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT); this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys()); - this.sortComp = new TupleComparator(finalSchema, sortNode.getSortKeys()); + this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys()); } } else { // The final result of a task will be written in a file named part-ss-nnnnnnn, @@ -685,7 +688,7 @@ public class Task { if (!storeDir.exists()) { storeDir.mkdirs(); } - storeFile = new File(storeDir, "in_" + i); + storeFile = new File(storeDir, "in_" + i + "." + RawFile.FILE_EXTENSION); Fetcher fetcher = new Fetcher(systemConf, uri, storeFile, channelFactory); runnerList.add(fetcher); i++; http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java index 422ec2b..4973ff8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java @@ -133,6 +133,12 @@ public class TaskAttemptContext { final Fragment [] fragments, final Path workDir) { this(queryContext, null, queryId, FragmentConvertor.toFragmentProtoArray(fragments), workDir); } + @VisibleForTesting + public TaskAttemptContext(final QueryContext queryContext, final QueryUnitAttemptId queryId, + final Fragment [] fragments, final Path workDir, ExecutionBlockSharedResource resource) { + this(queryContext, null, queryId, FragmentConvertor.toFragmentProtoArray(fragments), workDir); + this.sharedResource = resource; + } public TajoConf getConf() { return queryContext.getConf(); @@ -167,13 +173,9 @@ public class TaskAttemptContext { return sharedResource; } - public EvalNode compileEval(Schema schema, EvalNode eval) { - return sharedResource.compileEval(schema, eval); - } - public EvalNode getPrecompiledEval(Schema schema, EvalNode eval) { if (sharedResource != null) { - return sharedResource.getPreCompiledEval(schema, eval); + return sharedResource.getCompiledEval(schema, eval); } else { LOG.debug("Shared resource is not initialized. It is NORMAL in unit tests"); return eval; http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/codegen/TestTupleComparerCompiler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/codegen/TestTupleComparerCompiler.java b/tajo-core/src/test/java/org/apache/tajo/engine/codegen/TestTupleComparerCompiler.java new file mode 100644 index 0000000..6d9b135 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/engine/codegen/TestTupleComparerCompiler.java @@ -0,0 +1,352 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.codegen; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.storage.BaseTupleComparator; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.TupleComparator; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.tuple.offheap.*; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.List; + +import static org.apache.tajo.common.TajoDataTypes.Type.*; +import static org.apache.tajo.tuple.offheap.TestOffHeapRowBlock.schema; +import static org.junit.Assert.assertTrue; + +public class TestTupleComparerCompiler { + private static TajoClassLoader classLoader; + private static TupleComparerCompiler compiler; + + @BeforeClass + public static void setUp() { + classLoader = new TajoClassLoader(); + compiler = new TupleComparerCompiler(classLoader); + } + + @AfterClass + public static void tearDown() throws Throwable { + compiler = null; + + classLoader.clean(); + classLoader = null; + } + + private SortSpec [][] createSortSpecs(String columnName) { + Column column = schema.getColumn(columnName); + SortSpec [][] sortSpecList = new SortSpec[4][]; + sortSpecList[0] = new SortSpec[] {new SortSpec(column, true, false)}; + sortSpecList[1] = new SortSpec[] {new SortSpec(column, true, true)}; + sortSpecList[2] = new SortSpec[] {new SortSpec(column, false, false)}; + sortSpecList[3] = new SortSpec[] {new SortSpec(column, false, true)}; + return sortSpecList; + } + + private TupleComparator [] createComparators(SortSpec [][] sortSpecs, boolean unsafeTuple) { + TupleComparator [] comps = new TupleComparator[sortSpecs.length]; + for (int i = 0; i < sortSpecs.length; i++) { + BaseTupleComparator compImpl = new BaseTupleComparator(schema, sortSpecs[i]); + comps[i] = compiler.compile(compImpl, unsafeTuple); + } + + return comps; + } + + private void assertCompareAll(TupleComparator [] comps, SortSpec [][] sortSpecs, Tuple...tuples) { + Preconditions.checkArgument(comps.length == sortSpecs.length); + Preconditions.checkArgument(tuples.length == 5, "The number of tuples must be 5"); + + for (int i = 0; i < comps.length; i++) { + assertCompare(comps[i], sortSpecs[i], tuples); + } + } + + /** + * First two tuples must be the same values for equality check. + * @param tuples + */ + private void assertCompare(TupleComparator comp, SortSpec [] sortSpecs, Tuple...tuples) { + Preconditions.checkArgument(tuples.length == 5, "The number of tuples must be 5"); + + if (sortSpecs[0].isAscending()) { + assertTrue("Checking Equality", comp.compare(tuples[0], tuples[1]) == 0); + assertTrue("Checking Less Than", comp.compare(tuples[0], tuples[2]) < 0); + assertTrue("Checking Greater Than", comp.compare(tuples[2], tuples[0]) > 0); + } else { + assertTrue("Checking Equality", comp.compare(tuples[0], tuples[1]) == 0); + assertTrue("Checking Less Than", comp.compare(tuples[0], tuples[2]) > 0); + assertTrue("Checking Greater Than", comp.compare(tuples[2], tuples[0]) < 0); + } + + if (sortSpecs[0].isNullFirst()) { + assertTrue("Checking Greater Than", comp.compare(tuples[0], tuples[3]) > 0); + assertTrue("Checking Greater Than", comp.compare(tuples[3], tuples[0]) < 0); + } else { + assertTrue("Checking Greater Than", comp.compare(tuples[0], tuples[3]) < 0); + assertTrue("Checking Greater Than", comp.compare(tuples[3], tuples[0]) > 0); + } + + assertTrue("Checking Null Equality", comp.compare(tuples[3], tuples[4]) == 0); + assertTrue("Checking Null Equality", comp.compare(tuples[4], tuples[3]) == 0); + } + + @Test + public void testCompareOneBool() throws Exception { + SortSpec [][] sortSpecs = createSortSpecs("col0"); + TupleComparator [] comps = createComparators(sortSpecs, false); + + Tuple t1 = new VTuple(schema.size()); + t1.put(0, DatumFactory.createBool(true)); + + Tuple t2 = new VTuple(schema.size()); + t2.put(0, DatumFactory.createBool(true)); + + Tuple t3 = new VTuple(schema.size()); + t3.put(0, DatumFactory.createBool(false)); + + Tuple t4 = new VTuple(schema.size()); + t4.put(0, NullDatum.get()); + + assertCompareAll(comps, sortSpecs, t1, t2, t3, t4, t4); + } + + @Test + public void testCompareOneInt() throws Exception { + SortSpec [][] sortSpecs = createSortSpecs("col2"); + TupleComparator [] comps = createComparators(sortSpecs, false); + + Tuple t1 = new VTuple(schema.size()); + t1.put(2, DatumFactory.createInt2((short) 1)); + + Tuple t2 = new VTuple(schema.size()); + t2.put(2, DatumFactory.createInt2((short) 1)); + + Tuple t3 = new VTuple(schema.size()); + t3.put(2, DatumFactory.createInt2((short) 2)); + + Tuple t4 = new VTuple(schema.size()); + t4.put(2, NullDatum.get()); + + Tuple t5 = new VTuple(schema.size()); + t5.put(2, NullDatum.get()); + + assertCompareAll(comps, sortSpecs, t1, t2, t3, t4, t5); + } + + @Test + public void testCompareTwoInts() throws Exception { + SortSpec[] sortSpecs = new SortSpec[] { + new SortSpec(new Column("col2", INT2)), + new SortSpec(new Column("col3", INT4))}; + + + BaseTupleComparator comparator = new BaseTupleComparator(schema, sortSpecs); + + TajoClassLoader classLoader = new TajoClassLoader(); + + TupleComparerCompiler compiler = new TupleComparerCompiler(classLoader); + TupleComparator compiled = compiler.compile(comparator, false); + + Tuple t1 = new VTuple(schema.size()); + t1.put(2, DatumFactory.createInt2((short) 1)); + t1.put(3, DatumFactory.createInt4(1)); + + Tuple t2 = new VTuple(schema.size()); + t2.put(2, DatumFactory.createInt2((short) 1)); + t2.put(3, DatumFactory.createInt4(1)); + + Tuple t3 = new VTuple(schema.size()); + t3.put(2, DatumFactory.createInt2((short) 2)); + t3.put(3, DatumFactory.createInt4(1)); + + Tuple t4 = new VTuple(schema.size()); + t4.put(2, DatumFactory.createInt2((short) 1)); + t4.put(3, DatumFactory.createInt4(2)); + + Tuple t5 = new VTuple(schema.size()); + t5.put(2, NullDatum.get()); + t5.put(3, DatumFactory.createInt4(2)); + + Tuple t6 = new VTuple(schema.size()); + t6.put(2, DatumFactory.createInt2((short) 1)); + t6.put(3, NullDatum.get()); + + assertCompare(compiled, sortSpecs, t1, t2, t3, t5, t5); + assertCompare(compiled, sortSpecs, t1, t2, t4, t5, t5); + assertCompare(compiled, sortSpecs, t1, t2, t5, t6, t6); + } + + @Test + public void testCompareOneFloat4() throws Exception { + SortSpec [][] sortSpecs = createSortSpecs("col4"); + TupleComparator comps [] = createComparators(sortSpecs, false); + + Tuple t1 = new VTuple(schema.size()); + t1.put(4, DatumFactory.createFloat4(1.0f)); + + Tuple t2 = new VTuple(schema.size()); + t2.put(4, DatumFactory.createFloat4(1.0f)); + + Tuple t3 = new VTuple(schema.size()); + t3.put(4, DatumFactory.createFloat4(2.0f)); + + Tuple t4 = new VTuple(schema.size()); + t4.put(4, NullDatum.get()); + + assertCompareAll(comps, sortSpecs, t1, t2, t3, t4, t4); + } + + @Test + public void testCompareFloat4Float8() throws Exception { + SortSpec[] sortSpecs = new SortSpec[] { + new SortSpec(new Column("col4", FLOAT4)), + new SortSpec(new Column("col5", FLOAT8))}; + + BaseTupleComparator comparator = new BaseTupleComparator(schema, sortSpecs); + + TajoClassLoader classLoader = new TajoClassLoader(); + TupleComparerCompiler compiler = new TupleComparerCompiler(classLoader); + TupleComparator compiled = compiler.compile(comparator, false); + + Tuple t1 = new VTuple(schema.size()); + t1.put(4, DatumFactory.createFloat4(1.0f)); + t1.put(5, DatumFactory.createFloat8(1.0f)); + + Tuple t2 = new VTuple(schema.size()); + t2.put(4, DatumFactory.createFloat4(1.0f)); + t2.put(5, DatumFactory.createFloat8(1.0f)); + + Tuple t3 = new VTuple(schema.size()); + t3.put(4, DatumFactory.createFloat4(1.0f)); + t3.put(5, DatumFactory.createFloat8(2.0f)); + + Tuple t4 = new VTuple(schema.size()); + t4.put(4, DatumFactory.createFloat4(2.0f)); + t4.put(5, DatumFactory.createFloat8(1.0f)); + + Tuple t5 = new VTuple(schema.size()); + t5.put(4, DatumFactory.createFloat4(2.0f)); + t5.put(5, NullDatum.get()); + + Tuple t6 = new VTuple(schema.size()); + t6.put(4, NullDatum.get()); + t6.put(5, DatumFactory.createFloat8(1.0f)); + + assertCompare(compiled, sortSpecs, t1, t2, t3, t5, t5); + assertCompare(compiled, sortSpecs, t1, t2, t4, t5, t5); + assertCompare(compiled, sortSpecs, t1, t2, t5, t6, t6); + } + + @Test + public void testCompareText() throws Exception { + SortSpec [][] sortSpecs = createSortSpecs("col6"); + TupleComparator [] comps = createComparators(sortSpecs, false); + + Tuple t1 = new VTuple(schema.size()); + t1.put(6, DatumFactory.createText("tajo")); + + Tuple t2 = new VTuple(schema.size()); + t2.put(6, DatumFactory.createText("tajo")); + + Tuple t3 = new VTuple(schema.size()); + t3.put(6, DatumFactory.createText("tazo")); + + Tuple t4 = new VTuple(schema.size()); + t4.put(6, NullDatum.get()); + + assertCompareAll(comps, sortSpecs, t1, t2, t3, t4, t4); + } + + @Test + public void testCompareTextWithNull() throws Exception { + SortSpec[] sortSpecs = new SortSpec[] { + new SortSpec(new Column("col5", FLOAT8)), + new SortSpec(new Column("col6", TEXT))}; + BaseTupleComparator compImpl = new BaseTupleComparator(schema, sortSpecs); + TupleComparator comp = compiler.compile(compImpl, false); + + Tuple t1 = new VTuple(schema.size()); + t1.put(5, NullDatum.get()); + t1.put(6, DatumFactory.createText("ARGENTINA")); + + Tuple t2 = new VTuple(schema.size()); + t2.put(5, NullDatum.get()); + t2.put(6, DatumFactory.createText("ARGENTINA")); + + Tuple t3 = new VTuple(schema.size()); + t3.put(5, NullDatum.get()); + t3.put(6, DatumFactory.createText("CANADA")); + + Tuple t4 = new VTuple(schema.size()); + t4.put(5, NullDatum.get()); + t4.put(6, NullDatum.get()); + + assertCompare(comp, sortSpecs, t1, t2, t3, t4, t4); + } + + private void fillTextColumnToRowBlock(OffHeapRowBlock rowBlock, String text) { + RowWriter writer = rowBlock.getWriter(); + writer.startRow(); + writer.skipField(); // 0 + writer.skipField(); // 1 + writer.skipField(); // 2 + writer.skipField(); // 3 + writer.skipField(); // 4 + writer.skipField(); // 5 + if (text != null) { + writer.putText(text); + } + writer.endRow(); + } + + @Test + public void testCompareTextInUnSafeTuple() throws Exception { + SortSpec [][] sortSpecs = createSortSpecs("col6"); + TupleComparator [] comps = createComparators(sortSpecs, true); + + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 640); + fillTextColumnToRowBlock(rowBlock, "tajo"); + fillTextColumnToRowBlock(rowBlock, "tajo"); + fillTextColumnToRowBlock(rowBlock, "tazo"); + fillTextColumnToRowBlock(rowBlock, null); + + List<UnSafeTuple> tuples = Lists.newArrayList(); + + OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); + + reader.reset(); + ZeroCopyTuple zcTuple = new ZeroCopyTuple(); + while(reader.next(zcTuple)) { + tuples.add(zcTuple); + zcTuple = new ZeroCopyTuple(); + } + + assertCompareAll(comps, sortSpecs, tuples.get(0), tuples.get(1), tuples.get(2), tuples.get(3), tuples.get(3)); + rowBlock.release(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java index 4cbdddd..9964a38 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java @@ -40,7 +40,7 @@ import org.apache.tajo.engine.plan.EvalTreeProtoSerializer; import org.apache.tajo.engine.plan.proto.PlanProto; import org.apache.tajo.engine.planner.*; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.engine.utils.SchemaUtil; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.master.TajoMaster; import org.apache.tajo.storage.LazyTuple; import org.apache.tajo.storage.Tuple; http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java index b370be7..7116905 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java @@ -31,8 +31,8 @@ import org.apache.tajo.engine.eval.*; import org.apache.tajo.engine.function.builtin.SumInt; import org.apache.tajo.engine.parser.SQLAnalyzer; import org.apache.tajo.engine.planner.logical.*; +import org.apache.tajo.storage.BaseTupleComparator; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.VTuple; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.KeyValueSet; @@ -287,7 +287,7 @@ public class TestPlannerUtil { FieldEval f4 = new FieldEval("people.fid2", CatalogUtil.newSimpleDataType(Type.INT4)); EvalNode joinQual = new BinaryEval(EvalType.EQUAL, f1, f2); - TupleComparator [] comparators = PlannerUtil.getComparatorsFromJoinQual(joinQual, outerSchema, innerSchema); + BaseTupleComparator[] comparators = PlannerUtil.getComparatorsFromJoinQual(joinQual, outerSchema, innerSchema); Tuple t1 = new VTuple(2); t1.put(0, DatumFactory.createInt4(1)); @@ -297,11 +297,11 @@ public class TestPlannerUtil { t2.put(0, DatumFactory.createInt4(2)); t2.put(1, DatumFactory.createInt4(3)); - TupleComparator outerComparator = comparators[0]; + BaseTupleComparator outerComparator = comparators[0]; assertTrue(outerComparator.compare(t1, t2) < 0); assertTrue(outerComparator.compare(t2, t1) > 0); - TupleComparator innerComparator = comparators[1]; + BaseTupleComparator innerComparator = comparators[1]; assertTrue(innerComparator.compare(t1, t2) < 0); assertTrue(innerComparator.compare(t2, t1) > 0); http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java index 2294424..12f8892 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java @@ -22,8 +22,8 @@ import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.storage.BaseTupleComparator; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.TupleRange; import org.apache.tajo.storage.VTuple; import org.junit.Test; @@ -483,7 +483,7 @@ public class TestUniformRangePartition { TupleRange expected = new TupleRange(sortSpecs, s, e); UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs); - TupleComparator comp = new TupleComparator(schema, sortSpecs); + BaseTupleComparator comp = new BaseTupleComparator(schema, sortSpecs); Tuple tuple = s; Tuple prevTuple = null; http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java index f817776..5b4c696 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java @@ -68,7 +68,7 @@ public class TestBSTIndexExec { private LogicalOptimizer optimizer; private AbstractStorageManager sm; private Schema idxSchema; - private TupleComparator comp; + private BaseTupleComparator comp; private BSTIndex.BSTIndexWriter writer; private HashMap<Integer , Integer> randomValues ; private int rndKey = -1; @@ -104,7 +104,7 @@ public class TestBSTIndexExec { idxSchema.addColumn("managerid", Type.INT4); SortSpec[] sortKeys = new SortSpec[1]; sortKeys[0] = new SortSpec(idxSchema.getColumn("managerid"), true, false); - this.comp = new TupleComparator(idxSchema, sortKeys); + this.comp = new BaseTupleComparator(idxSchema, sortKeys); this.writer = new BSTIndex(conf).getIndexWriter(idxPath, BSTIndex.TWO_LEVEL_INDEX, this.idxSchema, this.comp); http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java index e7aac3c..eb02bfc 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java @@ -18,8 +18,13 @@ package org.apache.tajo.engine.planner.physical; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.SessionVars; import org.apache.tajo.TajoConstants; import org.apache.tajo.TajoTestingCluster; import org.apache.tajo.algebra.Expr; @@ -27,29 +32,38 @@ import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.engine.parser.SQLAnalyzer; import org.apache.tajo.engine.planner.*; import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.logical.LogicalNode; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.storage.*; +import org.apache.tajo.tuple.offheap.OffHeapRowBlock; +import org.apache.tajo.tuple.offheap.OffHeapRowBlockUtils; +import org.apache.tajo.tuple.offheap.TestOffHeapRowBlock; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.raw.TestDirectRawFile; +import org.apache.tajo.storage.rawfile.DirectRawFileScanner; +import org.apache.tajo.storage.rawfile.DirectRawFileWriter; import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.worker.ExecutionBlockSharedResource; import org.apache.tajo.worker.TaskAttemptContext; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.IOException; -import java.util.Random; +import java.util.List; import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.apache.tajo.tuple.offheap.TestOffHeapRowBlock.schema; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; public class TestExternalSortExec { + private static final Log LOG = LogFactory.getLog(TestExternalSortExec.class); + private TajoConf conf; private TajoTestingCluster util; private final String TEST_PATH = "target/test-data/TestExternalSortExec"; @@ -60,8 +74,6 @@ public class TestExternalSortExec { private Path testDir; private final int numTuple = 100000; - private Random rnd = new Random(System.currentTimeMillis()); - private TableDesc employee; @Before @@ -75,30 +87,22 @@ public class TestExternalSortExec { conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, testDir.toString()); sm = StorageManagerFactory.getStorageManager(conf, testDir); - Schema schema = new Schema(); - schema.addColumn("managerid", Type.INT4); - schema.addColumn("empid", Type.INT4); - schema.addColumn("deptname", Type.TEXT); - - TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV); - Path employeePath = new Path(testDir, "employee.csv"); - Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath); - appender.enableStats(); - appender.init(); - Tuple tuple = new VTuple(schema.size()); - for (int i = 0; i < numTuple; i++) { - tuple.put(new Datum[] { - DatumFactory.createInt4(rnd.nextInt(50)), - DatumFactory.createInt4(rnd.nextInt(100)), - DatumFactory.createText("dept_" + i), - }); - appender.addTuple(tuple); - } - appender.flush(); - appender.close(); + OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(numTuple); + TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.DIRECTRAW); + + Path outFile = new Path(TEST_PATH, "output.draw"); + + long startTime = System.currentTimeMillis(); + Path employeePath = TestDirectRawFile.writeRowBlock(conf, employeeMeta, rowBlock, outFile); + long endTime = System.currentTimeMillis(); + rowBlock.release(); - System.out.println(appender.getStats().getNumRows() + " rows (" + (appender.getStats().getNumBytes() / 1048576) + - " MB)"); + FileSystem fs = FileSystem.getLocal(conf); + FileStatus status = fs.getFileStatus(employeePath); + LOG.info("============================================================"); + LOG.info("Written file size: " + FileUtil.humanReadableByteCount(status.getLen(), false) + " " + + (endTime - startTime) + " msec"); + LOG.info("============================================================"); employee = new TableDesc("default.employee", schema, employeeMeta, employeePath); catalog.createTable(employee); @@ -113,35 +117,31 @@ public class TestExternalSortExec { } String[] QUERIES = { - "select managerId, empId from employee order by managerId, empId" + "select col2, col3, col4 from employee order by col2, col3" }; @Test - public final void testNext() throws IOException, PlanningException { + public final void testNext() throws IOException, PlanningException, InterruptedException { + QueryContext queryContext = LocalTajoTestingUtility.createDummyContext(conf); + queryContext.setBool(SessionVars.CODEGEN, true); + queryContext.setLong(SessionVars.EXTSORT_BUFFER_SIZE, 100); + + Expr expr = analyzer.parse(QUERIES[0]); + LogicalPlan plan = planner.createPlan(queryContext, expr); + LogicalNode rootNode = plan.getRootBlock().getRoot(); + + ExecutionBlockSharedResource resource = new ExecutionBlockSharedResource(); + resource.initialize(queryContext, rootNode.toJson()); + FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE); Path workDir = new Path(testDir, TestExternalSortExec.class.getName()); - TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), - LocalTajoTestingUtility.newQueryUnitAttemptId(), new FileFragment[] { frags[0] }, workDir); - ctx.setEnforcer(new Enforcer()); - Expr expr = analyzer.parse(QUERIES[0]); - LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr); - LogicalNode rootNode = plan.getRootBlock().getRoot(); + TaskAttemptContext taskContext = new TaskAttemptContext(queryContext, + LocalTajoTestingUtility.newQueryUnitAttemptId(), new FileFragment[] { frags[0] }, workDir, resource); + taskContext.setEnforcer(new Enforcer()); PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); - PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); - - ProjectionExec proj = (ProjectionExec) exec; - - // TODO - should be planed with user's optimization hint - if (!(proj.getChild() instanceof ExternalSortExec)) { - UnaryPhysicalExec sortExec = proj.getChild(); - SeqScanExec scan = sortExec.getChild(); - - ExternalSortExec extSort = new ExternalSortExec(ctx, sm, - ((MemSortExec)sortExec).getPlan(), scan); - proj.setChild(extSort); - } + PhysicalExec exec = phyPlanner.createPlan(taskContext, rootNode); Tuple tuple; Tuple preVal = null; @@ -149,10 +149,10 @@ public class TestExternalSortExec { int cnt = 0; exec.init(); long start = System.currentTimeMillis(); - TupleComparator comparator = new TupleComparator(proj.getSchema(), + BaseTupleComparator comparator = new BaseTupleComparator(exec.getSchema(), new SortSpec[]{ - new SortSpec(new Column("managerid", Type.INT4)), - new SortSpec(new Column("empid", Type.INT4)) + new SortSpec(new Column("col2", Type.INT4)), + new SortSpec(new Column("col3", Type.INT8)) }); while ((tuple = exec.next()) != null) { @@ -180,6 +180,26 @@ public class TestExternalSortExec { } assertEquals(numTuple, cnt); exec.close(); - System.out.println("Sort Time: " + (end - start) + " msc"); + System.out.println("Sort and final write time: " + (end - start) + " msc"); + } + + public static DirectRawFileScanner createSortedScanner(TajoConf conf, TableMeta meta, int rowNum, + BaseTupleComparator comparator) + throws IOException { + Path testDir = CommonTestingUtil.getTestDir(); + Path outFile = new Path(testDir, "file1.out"); + + OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(rowNum); + + List<Tuple> tupleList = OffHeapRowBlockUtils.sort(rowBlock, comparator); + + DirectRawFileWriter writer1 = new DirectRawFileWriter(conf, schema, meta, outFile); + writer1.init(); + for (Tuple t:tupleList) { + writer1.addTuple(t); + } + writer1.close(); + + return new DirectRawFileScanner(conf, schema, meta, outFile); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPairWiseMerger.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPairWiseMerger.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPairWiseMerger.java new file mode 100644 index 0000000..4f87513 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPairWiseMerger.java @@ -0,0 +1,316 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.storage.BaseTupleComparator; +import org.apache.tajo.storage.Scanner; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.offheap.TestOffHeapRowBlock; +import org.apache.tajo.storage.rawfile.DirectRawFileScanner; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.tajo.tuple.offheap.TestOffHeapRowBlock.schema; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestPairWiseMerger { + static TajoConf conf; + private static BaseTupleComparator comparator; + static { + conf = new TajoConf(); + + comparator = new BaseTupleComparator(TestOffHeapRowBlock.schema, + new SortSpec[] { + new SortSpec(new Column("col2", TajoDataTypes.Type.INT4)), + new SortSpec(new Column("col3", TajoDataTypes.Type.INT8)) + }); + } + + @Test + public void testPairWiseMergerWithTwoLeaves() throws IOException { + int [] rowNums = new int [] {500, 1000}; + + Scanner [] scanners = createScanners(rowNums); + + PairWiseMerger merger = createLeftDeepMerger(scanners); + + merger.init(); + assertSortResult(rowNums, merger, comparator); + merger.reset(); + assertSortResult(rowNums, merger, comparator); + merger.close(); + + merger = createRightDeepMerger(scanners); + + merger.init(); + assertSortResult(rowNums, merger, comparator); + merger.reset(); + assertSortResult(rowNums, merger, comparator); + merger.close(); + } + + @Test + public void testPairWiseMergerWithThreeLeaves1() throws IOException { + int [] rowNums = new int[] {1, 1, 1}; + Scanner [] scanners = createScanners(rowNums); + + PairWiseMerger merger = createLeftDeepMerger(scanners); + + merger.init(); + assertSortResult(rowNums, merger, comparator); + merger.reset(); + assertSortResult(rowNums, merger, comparator); + merger.close(); + + merger = createRightDeepMerger(scanners); + + merger.init(); + assertSortResult(rowNums, merger, comparator); + merger.reset(); + assertSortResult(rowNums, merger, comparator); + merger.close(); + } + + @Test + public void testPairWiseMergerWithThreeLeaves2() throws IOException { + int [] rowNums = new int[] {0, 0, 1}; + Scanner [] scanners = createScanners(rowNums); + + PairWiseMerger merger = createLeftDeepMerger(scanners); + + merger.init(); + assertSortResult(rowNums, merger, comparator); + merger.reset(); + assertSortResult(rowNums, merger, comparator); + merger.close(); + + merger = createRightDeepMerger(scanners); + + merger.init(); + assertSortResult(rowNums, merger, comparator); + merger.reset(); + assertSortResult(rowNums, merger, comparator); + merger.close(); + } + + @Test + public void testPairWiseMergerWithThreeLeaves3() throws IOException { + int [] rowNums = new int[] {0, 1, 0}; + Scanner [] scanners = createScanners(rowNums); + + PairWiseMerger merger = createLeftDeepMerger(scanners); + + merger.init(); + assertSortResult(rowNums, merger, comparator); + merger.reset(); + assertSortResult(rowNums, merger, comparator); + merger.close(); + + merger = createRightDeepMerger(scanners); + + merger.init(); + assertSortResult(rowNums, merger, comparator); + merger.reset(); + assertSortResult(rowNums, merger, comparator); + merger.close(); + } + + @Test + public void testPairWiseMergerWithThreeLeaves4() throws IOException { + int [] rowNums = new int[] {1, 0, 0}; + Scanner [] scanners = createScanners(rowNums); + + PairWiseMerger merger = createLeftDeepMerger(scanners); + + merger.init(); + assertSortResult(rowNums, merger, comparator); + merger.reset(); + assertSortResult(rowNums, merger, comparator); + merger.close(); + + merger = createRightDeepMerger(scanners); + + merger.init(); + assertSortResult(rowNums, merger, comparator); + merger.reset(); + assertSortResult(rowNums, merger, comparator); + merger.close(); + } + + @Test + public void testPairWiseMergerWithThreeLeaves5() throws IOException { + int [] rowNums = new int[] {1, 0, 1}; + Scanner [] scanners = createScanners(rowNums); + + PairWiseMerger merger = createLeftDeepMerger(scanners); + + merger.init(); + assertSortResult(rowNums, merger, comparator); + merger.reset(); + assertSortResult(rowNums, merger, comparator); + merger.close(); + + merger = createRightDeepMerger(scanners); + + merger.init(); + assertSortResult(rowNums, merger, comparator); + merger.reset(); + assertSortResult(rowNums, merger, comparator); + merger.close(); + } + + @Test + public void testPairWiseMergerWithThreeLeaves6() throws IOException { + int [] rowNums = new int[] {1, 1, 0}; + + Scanner [] scanners = createScanners(rowNums); + + PairWiseMerger merger = createLeftDeepMerger(scanners); + + merger.init(); + assertSortResult(rowNums, merger, comparator); + merger.reset(); + assertSortResult(rowNums, merger, comparator); + merger.close(); + + merger = createRightDeepMerger(scanners); + + merger.init(); + assertSortResult(rowNums, merger, comparator); + merger.reset(); + assertSortResult(rowNums, merger, comparator); + merger.close(); + } + + @Test + public void testPairWiseMergerWithThreeLeaves7() throws IOException { + int [] rowNums = new int[] {1, 0, 0}; + + Scanner [] scanners = createScanners(rowNums); + + PairWiseMerger merger = createLeftDeepMerger(scanners); + + merger.init(); + assertSortResult(rowNums, merger, comparator); + merger.reset(); + assertSortResult(rowNums, merger, comparator); + merger.close(); + + merger = createRightDeepMerger(scanners); + + merger.init(); + assertSortResult(rowNums, merger, comparator); + merger.reset(); + assertSortResult(rowNums, merger, comparator); + merger.close(); + } + + @Test + public void testThreeLevelPairWiseMerger1() throws IOException { + int [] rowNums = new int[] {500, 501, 499, 498, 489, 450, 431, 429}; + + Scanner [] scanners = createScanners(rowNums); + + PairWiseMerger merger = createLeftDeepMerger(scanners); + + merger.init(); + assertSortResult(rowNums, merger, comparator); + merger.reset(); + assertSortResult(rowNums, merger, comparator); + merger.close(); + + merger = createRightDeepMerger(scanners); + + merger.init(); + assertSortResult(rowNums, merger, comparator); + merger.reset(); + assertSortResult(rowNums, merger, comparator); + merger.close(); + } + + private static PairWiseMerger createLeftDeepMerger(Scanner [] scanners) throws IOException { + PairWiseMerger prev = null; + for (int i = 1; i < scanners.length; i++) { + + if (i == 1) { + prev = new PairWiseMerger(schema, scanners[i - 1], scanners[i], comparator); // initial one + } else { + prev = new PairWiseMerger(schema, prev, scanners[i], comparator); + } + } + + return prev; + } + + private static PairWiseMerger createRightDeepMerger(Scanner [] scanners) throws IOException { + PairWiseMerger prev = null; + for (int i = 1; i < scanners.length; i++) { + + if (i == 1) { + prev = new PairWiseMerger(schema, scanners[i - 1], scanners[i], comparator); // initial one + } else { + prev = new PairWiseMerger(schema, scanners[i], prev, comparator); + } + } + + return prev; + } + + private static Scanner [] createScanners(int [] rowNums) throws IOException { + DirectRawFileScanner[] scanners = new DirectRawFileScanner[rowNums.length]; + + TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.DIRECTRAW); + for (int i = 0; i < rowNums.length; i++) { + scanners[i] = TestExternalSortExec.createSortedScanner(conf, meta, rowNums[i], comparator); + } + + assertEquals(rowNums.length, scanners.length); + return scanners; + } + + private static void assertSortResult(int[] rowNums, Scanner scanner, BaseTupleComparator comparator) throws IOException { + Tuple tuple; + Tuple curVal; + Tuple preVal = null; + int idx = 0; + while ((tuple = scanner.next()) != null) { + curVal = tuple; + if (preVal != null) { + assertTrue(idx + "th, prev: " + preVal + ", but cur: " + curVal, comparator.compare(preVal, curVal) <= 0); + } + preVal = curVal; + idx++; + } + + int totalRowNum = 0; + for (int i = 0; i < rowNums.length; i++) { + totalRowNum += rowNums[i]; + } + assertEquals(totalRowNum, idx); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java index 5d809f8..0987a78 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java @@ -1071,7 +1071,7 @@ public class TestPhysicalPlanner { keySchema.addColumn("?empId", Type.INT4); SortSpec[] sortSpec = new SortSpec[1]; sortSpec[0] = new SortSpec(keySchema.getColumn(0), true, false); - TupleComparator comp = new TupleComparator(keySchema, sortSpec); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortSpec); BSTIndex bst = new BSTIndex(conf); BSTIndex.BSTIndexReader reader = bst.getIndexReader(new Path(workDir, "output/index"), keySchema, comp); http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java index f649dac..8f130de 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java @@ -87,7 +87,7 @@ public class TestProgressExternalSortExec { schema.addColumn("empid", TajoDataTypes.Type.INT4); schema.addColumn("deptname", TajoDataTypes.Type.TEXT); - TableMeta employeeMeta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.RAW); + TableMeta employeeMeta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.DIRECTRAW); Path employeePath = new Path(testDir, "employee.csv"); Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath); appender.enableStats(); @@ -170,7 +170,7 @@ public class TestProgressExternalSortExec { Tuple curVal; int cnt = 0; exec.init(); - TupleComparator comparator = new TupleComparator(proj.getSchema(), + BaseTupleComparator comparator = new BaseTupleComparator(proj.getSchema(), new SortSpec[]{ new SortSpec(new Column("managerid", TajoDataTypes.Type.INT4)), new SortSpec(new Column("empid", TajoDataTypes.Type.INT4)) http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java index 4d4cc3d..5610a60 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java @@ -83,7 +83,7 @@ public class TestSortExec { Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, schema, tablePath); appender.init(); Tuple tuple = new VTuple(schema.size()); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 100000; i++) { tuple.put(new Datum[] { DatumFactory.createInt4(rnd.nextInt(5)), DatumFactory.createInt4(rnd.nextInt(10)), http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java index cecb281..78cce64 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java @@ -112,7 +112,7 @@ public class TestTupleUtil { sortSpecs); TupleRange [] ranges = partitioner.partition(5); assertTrue(5 <= ranges.length); - TupleComparator comp = new TupleComparator(schema, PlannerUtil.schemaToSortSpecs(schema)); + BaseTupleComparator comp = new BaseTupleComparator(schema, PlannerUtil.schemaToSortSpecs(schema)); TupleRange prev = ranges[0]; for (int i = 1; i < ranges.length; i++) { assertTrue(comp.compare(prev.getStart(), ranges[i].getStart()) < 0); http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java index 5f8efe7..ccd2cde 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java @@ -174,7 +174,7 @@ public class TestRangeRetrieverHandler { exec.close(); Schema keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs); - TupleComparator comp = new TupleComparator(keySchema, sortSpecs); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortSpecs); BSTIndex bst = new BSTIndex(conf); BSTIndex.BSTIndexReader reader = bst.getIndexReader( new Path(testDir, "output/index"), keySchema, comp); @@ -298,7 +298,7 @@ public class TestRangeRetrieverHandler { exec.close(); Schema keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs); - TupleComparator comp = new TupleComparator(keySchema, sortSpecs); + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortSpecs); BSTIndex bst = new BSTIndex(conf); BSTIndex.BSTIndexReader reader = bst.getIndexReader( new Path(testDir, "output/index"), keySchema, comp); http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-dist/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml index a0cbe9f..5a14795 100644 --- a/tajo-dist/pom.xml +++ b/tajo-dist/pom.xml @@ -132,6 +132,7 @@ run cp -r $ROOT/tajo-catalog/target/tajo-catalog-${project.version}/* . run cp -r $ROOT/tajo-storage/target/tajo-storage-${project.version}/* . run cp -r $ROOT/tajo-yarn-pullserver/target/tajo-yarn-pullserver-${project.version}.jar . + run cp -r $ROOT/tajo-thirdparty/asm/target/tajo-thirdparty-asm-${project.version}.jar . run cp -r $ROOT/tajo-core/target/tajo-core-${project.version}.jar . run cp -r $ROOT/tajo-core/target/lib . run cp -r ${project.basedir}/src/main/bin . http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java ---------------------------------------------------------------------- diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java index 5dae67e..6c8ef5d 100644 --- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java +++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java @@ -17,6 +17,7 @@ package org.apache.tajo.jdbc; /** */ import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.IntervalDatum; import org.apache.tajo.datum.NullDatum; import org.apache.tajo.datum.ProtobufDatum; import org.apache.tajo.exception.UnsupportedException; @@ -47,7 +48,12 @@ public class MetaDataTuple implements Tuple { @Override public boolean isNull(int fieldid) { - return values.get(fieldid) == null || values.get(fieldid) instanceof NullDatum; + return values.get(fieldid) == null || values.get(fieldid).isNull(); + } + + @Override + public boolean isNotNull(int fieldid) { + return !isNull(fieldid); } @Override @@ -142,7 +148,12 @@ public class MetaDataTuple implements Tuple { @Override public ProtobufDatum getProtobufDatum(int fieldId) { - throw new UnsupportedException(); + throw new UnsupportedException("getProtobufDatum"); + } + + @Override + public IntervalDatum getInterval(int fieldId) { + throw new UnsupportedException("getInterval"); } @Override @@ -157,6 +168,6 @@ public class MetaDataTuple implements Tuple { @Override public Datum[] getValues(){ - throw new UnsupportedException(); + throw new UnsupportedException("getValues"); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-project/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml index fe5366e..e65d844 100644 --- a/tajo-project/pom.xml +++ b/tajo-project/pom.xml @@ -736,6 +736,13 @@ </dependency> <dependency> <groupId>org.apache.tajo</groupId> + <artifactId>tajo-storage</artifactId> + <type>test-jar</type> + <scope>test</scope> + <version>${tajo.version}</version> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> <artifactId>tajo-yarn-pullserver</artifactId> <version>${tajo.version}</version> </dependency>
