This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch aggregator in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 25b649bd0d0000db00cc29970018da6d8b20b636 Author: Alima777 <[email protected]> AuthorDate: Thu Apr 28 17:09:23 2022 +0800 modify seriesAggregateScanOperator using aggregator --- .../Accumulator.java} | 45 +-- .../db/mpp/operator/aggregation/Aggregator.java | 98 +++++++ .../AggregatorFactory.java} | 45 +-- .../mpp/operator/aggregation/AvgAccumulator.java | 86 ++++++ .../mpp/operator/aggregation/CountAccumulator.java | 84 ++++++ .../db/mpp/operator/process/AggregateOperator.java | 17 +- .../source/SeriesAggregateScanOperator.java | 111 +++---- .../db/mpp/sql/planner/LocalExecutionPlanner.java | 2 +- .../sql/planner/plan/parameter/InputLocation.java | 4 +- .../operator/SeriesAggregateScanOperatorTest.java | 5 +- .../iotdb/tsfile/read/common/block/TsBlock.java | 38 ++- .../read/common/block/column/BinaryColumn.java | 9 + .../read/common/block/column/BooleanColumn.java | 9 + .../tsfile/read/common/block/column/Column.java | 6 + .../read/common/block/column/DoubleColumn.java | 9 + .../read/common/block/column/FloatColumn.java | 8 + .../tsfile/read/common/block/column/IntColumn.java | 8 + .../read/common/block/column/LongColumn.java | 8 + .../block/column/RunLengthEncodedColumn.java | 8 + .../read/common/block/column/TimeColumn.java | 8 + .../iotdb/tsfile/common/block/TsBlockTest.java | 49 ++++ .../iotdb/tsfile/read/common/ColumnTest.java | 322 +++++++++++++++++++++ 22 files changed, 837 insertions(+), 142 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java similarity index 52% copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java index 0eda188787..f6f268aad1 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java @@ -16,42 +16,29 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.mpp.operator.process; +package org.apache.iotdb.db.mpp.operator.aggregation; -import org.apache.iotdb.db.mpp.operator.OperatorContext; -import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.read.common.TimeRange; +import org.apache.iotdb.tsfile.read.common.block.column.Column; +import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; -import com.google.common.util.concurrent.ListenableFuture; +public interface Accumulator { -public class AggregateOperator implements ProcessOperator { + // Column should be like: | Time | Value | + void addInput(Column[] column, TimeRange timeRange); - @Override - public OperatorContext getOperatorContext() { - return null; - } + void addIntermediate(Column[] partialResult); - @Override - public ListenableFuture<Void> isBlocked() { - return ProcessOperator.super.isBlocked(); - } + void addStatistics(Statistics statistics); - @Override - public TsBlock next() { - return null; - } + void setFinal(Column finalResult); - @Override - public boolean hasNext() { - return false; - } + void outputIntermediate(ColumnBuilder[] tsBlockBuilder); - @Override - public void close() throws Exception { - ProcessOperator.super.close(); - } + void outputFinal(ColumnBuilder tsBlockBuilder); - @Override - public boolean isFinished() { - return false; - } + void reset(); + + boolean hasFinalResult(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java new file mode 100644 index 0000000000..94fe9080cb --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.operator.aggregation; + +import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.AggregationStep; +import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.read.common.TimeRange; +import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; + +import java.util.List; + +public class Aggregator { + + private final Accumulator accumulator; + private final List<InputLocation> inputLocationList; + private final AggregationStep step; + private final TSDataType intermediateType; + private final TSDataType finalType; + + private TimeRange timeRange; + + public Aggregator( + Accumulator accumulator, + AggregationStep step, + List<InputLocation> inputLocationList, + TSDataType intermediateType, + TSDataType finalType) { + this.accumulator = accumulator; + this.step = step; + this.inputLocationList = inputLocationList; + this.intermediateType = intermediateType; + this.finalType = finalType; + } + + public void processTsBlock(TsBlock tsBlock) { + if (step.isInputRaw()) { + accumulator.addInput(tsBlock.getTimeAndValueColumn(0), timeRange); + } else { + accumulator.addIntermediate(tsBlock.getColumns(new int[] {0})); + } + } + + public void outputResult(ColumnBuilder[] columnBuilder) { + if (step.isOutputPartial()) { + accumulator.outputIntermediate(columnBuilder); + } else { + accumulator.outputFinal(columnBuilder[0]); + } + } + + public void processStatistics(Statistics statistics) { + accumulator.addStatistics(statistics); + } + + public TSDataType getOutputType() { + if (step.isOutputPartial()) { + return intermediateType; + } else { + return finalType; + } + } + + public void reset() { + accumulator.reset(); + } + + public boolean hasFinalResult() { + return accumulator.hasFinalResult(); + } + + public void setTimeRange(TimeRange timeRange) { + this.timeRange = timeRange; + } + + public TimeRange getTimeRange() { + return timeRange; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java similarity index 52% copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java index 0eda188787..ce493b3256 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java @@ -16,42 +16,19 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.mpp.operator.process; -import org.apache.iotdb.db.mpp.operator.OperatorContext; -import org.apache.iotdb.tsfile.read.common.block.TsBlock; +package org.apache.iotdb.db.mpp.operator.aggregation; -import com.google.common.util.concurrent.ListenableFuture; +public class AggregatorFactory { -public class AggregateOperator implements ProcessOperator { - - @Override - public OperatorContext getOperatorContext() { - return null; - } - - @Override - public ListenableFuture<Void> isBlocked() { - return ProcessOperator.super.isBlocked(); - } - - @Override - public TsBlock next() { - return null; - } - - @Override - public boolean hasNext() { - return false; - } - - @Override - public void close() throws Exception { - ProcessOperator.super.close(); - } - - @Override - public boolean isFinished() { - return false; + public static Aggregator createAggregator() { + Accumulator accumulator; + if (step.isInputRaw()) { + accumulator = accumulatorFactory.createAccumulator(lambdaProviders); + } else { + accumulator = accumulatorFactory.createIntermediateAccumulator(lambdaProviders); + } + return new Aggregator( + accumulator, step, intermediateType, finalType, inputChannels, maskChannel); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AvgAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AvgAccumulator.java new file mode 100644 index 0000000000..dc993a65c5 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AvgAccumulator.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.operator.aggregation; + +import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.read.common.TimeRange; +import org.apache.iotdb.tsfile.read.common.block.column.Column; +import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; + +public class AvgAccumulator implements Accumulator { + + private TSDataType seriesDataType; + private long countValue; + private double sumValue; + + @Override + public void addInput(Column[] column, TimeRange timeRange) {} + + @Override + public void addIntermediate(Column[] partialResult) {} + + @Override + public void addStatistics(Statistics statistics) {} + + @Override + public void setFinal(Column finalResult) {} + + @Override + public void outputIntermediate(ColumnBuilder[] tsBlockBuilder) {} + + @Override + public void outputFinal(ColumnBuilder tsBlockBuilder) {} + + @Override + public void reset() { + this.countValue = 0; + this.sumValue = 0.0; + } + + @Override + public boolean hasFinalResult() { + return false; + } + + private void updateAvg(TSDataType type, Object sumVal) throws UnSupportedDataTypeException { + double val; + switch (type) { + case INT32: + val = (int) sumVal; + break; + case INT64: + val = (long) sumVal; + break; + case FLOAT: + val = (float) sumVal; + break; + case DOUBLE: + val = (double) sumVal; + break; + case TEXT: + case BOOLEAN: + default: + throw new UnSupportedDataTypeException( + String.format("Unsupported data type in aggregation AVG : %s", type)); + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java new file mode 100644 index 0000000000..db266b55e4 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.operator.aggregation; + +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.read.common.TimeRange; +import org.apache.iotdb.tsfile.read.common.block.column.Column; +import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; + +public class CountAccumulator implements Accumulator { + + private long countValue = 0; + + // Column should be like: | Time | Value | + @Override + public void addInput(Column[] column, TimeRange timeRange) { + TimeColumn timeColumn = (TimeColumn) column[0]; + for (int i = 0; i < timeColumn.getPositionCount(); i++) { + long curTime = timeColumn.getLong(i); + if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) { + break; + } + countValue++; + } + } + + // partialResult should be like: | partialCountValue1 | partialCountValue2 | + @Override + public void addIntermediate(Column[] partialResult) { + for (int i = 0; i < partialResult.length; i++) { + countValue += partialResult[i].getLong(0); + } + } + + @Override + public void addStatistics(Statistics statistics) { + countValue += statistics.getCount(); + } + + // finalResult should be single column, like: | finalCountValue | + @Override + public void setFinal(Column finalResult) { + countValue = finalResult.getLong(0); + } + + // columnBuilder should be single in countAccumulator + @Override + public void outputIntermediate(ColumnBuilder[] columnBuilders) { + columnBuilders[0].writeLong(countValue); + } + + @Override + public void outputFinal(ColumnBuilder columnBuilder) { + columnBuilder.writeLong(countValue); + } + + @Override + public void reset() { + this.countValue = 0; + } + + @Override + public boolean hasFinalResult() { + return false; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java index 0eda188787..e0cd04a5a4 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java @@ -18,16 +18,31 @@ */ package org.apache.iotdb.db.mpp.operator.process; +import org.apache.iotdb.db.mpp.operator.Operator; import org.apache.iotdb.db.mpp.operator.OperatorContext; +import org.apache.iotdb.db.mpp.operator.aggregation.Aggregator; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import com.google.common.util.concurrent.ListenableFuture; +import java.util.List; + public class AggregateOperator implements ProcessOperator { + private final OperatorContext operatorContext; + private final List<Aggregator> aggregators; + private final List<Operator> children; + + public AggregateOperator( + OperatorContext operatorContext, List<Aggregator> aggregators, List<Operator> children) { + this.operatorContext = operatorContext; + this.aggregators = aggregators; + this.children = children; + } + @Override public OperatorContext getOperatorContext() { - return null; + return operatorContext; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java index b94fadccc1..8966b1f57b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java @@ -19,15 +19,11 @@ package org.apache.iotdb.db.mpp.operator.source; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; -import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.mpp.operator.OperatorContext; +import org.apache.iotdb.db.mpp.operator.aggregation.Aggregator; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.GroupByTimeParameter; -import org.apache.iotdb.db.query.aggregation.AggregateResult; -import org.apache.iotdb.db.query.aggregation.AggregationType; -import org.apache.iotdb.db.query.factory.AggregateResultFactory; -import org.apache.iotdb.db.utils.SchemaUtils; import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator; import org.apache.iotdb.db.utils.timerangeiterator.SingleTimeWindowIterator; import org.apache.iotdb.db.utils.timerangeiterator.TimeRangeIteratorFactory; @@ -43,7 +39,6 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter; import com.google.common.util.concurrent.ListenableFuture; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -62,15 +57,15 @@ public class SeriesAggregateScanOperator implements DataSourceOperator { private final PlanNodeId sourceId; private final SeriesScanUtil seriesScanUtil; private final boolean ascending; - private List<AggregateResult> aggregateResultList; + // We still think aggregator in SeriesAggregateScanOperator is a inputRaw step. + // But in facing of statistics, it will invoke another method processStatistics() + private List<Aggregator> aggregators; private ITimeRangeIterator timeRangeIterator; // current interval of aggregation window [curStartTime, curEndTime) private TimeRange curTimeRange; - private TsBlockSingleColumnIterator preCachedData; - // used for resetting the preCachedData to the last read index - private int lastReadIndex; + private TsBlock preCachedData; private TsBlockBuilder tsBlockBuilder; private TsBlock resultTsBlock; @@ -82,7 +77,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator { PartialPath seriesPath, Set<String> allSensors, OperatorContext context, - List<AggregationType> aggregateFuncList, + List<Aggregator> aggregators, Filter timeFilter, boolean ascending, GroupByTimeParameter groupByTimeParameter) { @@ -98,21 +93,10 @@ public class SeriesAggregateScanOperator implements DataSourceOperator { timeFilter, null, ascending); - aggregateResultList = new ArrayList<>(aggregateFuncList.size()); - for (AggregationType aggregationType : aggregateFuncList) { - aggregateResultList.add( - AggregateResultFactory.getAggrResultByType( - aggregationType, - seriesPath.getSeriesType(), - seriesScanUtil.getOrderUtils().getAscending())); - } + this.aggregators = aggregators; tsBlockBuilder = new TsBlockBuilder( - aggregateFuncList.stream() - .map( - functionType -> - SchemaUtils.getSeriesTypeByPath(seriesPath, functionType.name())) - .collect(Collectors.toList())); + aggregators.stream().map(Aggregator::getOutputType).collect(Collectors.toList())); this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter); } @@ -169,8 +153,9 @@ public class SeriesAggregateScanOperator implements DataSourceOperator { curTimeRange = timeRangeIterator.nextTimeRange(); // 1. Clear previous aggregation result - for (AggregateResult result : aggregateResultList) { - result.reset(); + for (Aggregator aggregator : aggregators) { + aggregator.reset(); + aggregator.setTimeRange(curTimeRange); } // 2. Calculate aggregation result based on current time window @@ -226,14 +211,15 @@ public class SeriesAggregateScanOperator implements DataSourceOperator { } private void updateResultTsBlockUsingAggregateResult() { - // TODO AVG tsBlockBuilder.reset(); TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder(); // Use start time of current time range as time column timeColumnBuilder.writeLong(curTimeRange.getMin()); ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders(); - for (int i = 0; i < aggregateResultList.size(); i++) { - columnBuilders[i].writeObject(aggregateResultList.get(i).getResult()); + for (int i = 0; i < aggregators.size(); i++) { + ColumnBuilder[] columnBuilder = new ColumnBuilder[1]; + columnBuilder[0] = columnBuilders[i]; + aggregators.get(i).outputResult(columnBuilder); } tsBlockBuilder.declarePosition(); resultTsBlock = tsBlockBuilder.build(); @@ -273,41 +259,33 @@ public class SeriesAggregateScanOperator implements DataSourceOperator { } @SuppressWarnings("squid:S3776") - private void calcFromBatch(TsBlockSingleColumnIterator blockIterator, TimeRange curTimeRange) - throws IOException { + private void calcFromBatch(TsBlock tsBlock, TimeRange curTimeRange) { // check if the batchData does not contain points in current interval - if (!satisfied(blockIterator, curTimeRange)) { + if (!satisfied(tsBlock, curTimeRange)) { return; } - for (AggregateResult result : aggregateResultList) { + // skip points that cannot be calculated + tsBlock = skipOutOfTimeRangePoints(tsBlock, curTimeRange); + + for (Aggregator aggregator : aggregators) { // current agg method has been calculated - if (result.hasFinalResult()) { + if (aggregator.hasFinalResult()) { continue; } - // lazy reset batch data for calculation - blockIterator.setRowIndex(lastReadIndex); - // skip points that cannot be calculated - skipOutOfTimeRangePoints(blockIterator, curTimeRange); - - if (blockIterator.hasNext()) { - result.updateResultFromPageData( - blockIterator, curTimeRange.getMin(), curTimeRange.getMax()); - } - } - // reset the last position to current Index - lastReadIndex = blockIterator.getRowIndex(); + aggregator.processTsBlock(tsBlock); + } // can calc for next interval - if (blockIterator.hasNext()) { - preCachedData = blockIterator; + if (tsBlock.getTsBlockSingleColumnIterator().hasNext()) { + preCachedData = tsBlock; } } // skip points that cannot be calculated - private void skipOutOfTimeRangePoints( - TsBlockSingleColumnIterator tsBlockIterator, TimeRange curTimeRange) { + private TsBlock skipOutOfTimeRangePoints(TsBlock tsBlock, TimeRange curTimeRange) { + TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator(); if (ascending) { while (tsBlockIterator.hasNext() && tsBlockIterator.currentTime() < curTimeRange.getMin()) { tsBlockIterator.next(); @@ -317,9 +295,11 @@ public class SeriesAggregateScanOperator implements DataSourceOperator { tsBlockIterator.next(); } } + return tsBlock.subTsBlock(tsBlockIterator.getRowIndex()); } - private boolean satisfied(TsBlockSingleColumnIterator tsBlockIterator, TimeRange timeRange) { + private boolean satisfied(TsBlock tsBlock, TimeRange timeRange) { + TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator(); if (tsBlockIterator == null || !tsBlockIterator.hasNext()) { return false; } @@ -332,15 +312,15 @@ public class SeriesAggregateScanOperator implements DataSourceOperator { if (!ascending && (tsBlockIterator.getStartTime() >= timeRange.getMax() || tsBlockIterator.currentTime() < timeRange.getMin())) { - preCachedData = tsBlockIterator; + preCachedData = tsBlock; return false; } return true; } private boolean isEndCalc() { - for (AggregateResult result : aggregateResultList) { - if (!result.hasFinalResult()) { + for (Aggregator aggregator : aggregators) { + if (!aggregator.hasFinalResult()) { return false; } } @@ -375,23 +355,23 @@ public class SeriesAggregateScanOperator implements DataSourceOperator { } // calc from page data - TsBlockSingleColumnIterator tsBlockIterator = - seriesScanUtil.nextPage().getTsBlockSingleColumnIterator(); + TsBlock tsBlock = seriesScanUtil.nextPage(); + TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator(); if (tsBlockIterator == null || !tsBlockIterator.hasNext()) { continue; } // reset the last position to current Index - lastReadIndex = tsBlockIterator.getRowIndex(); + // lastReadIndex = tsBlockIterator.getRowIndex(); // stop calc and cached current batchData if (ascending && tsBlockIterator.currentTime() >= curTimeRange.getMax()) { - preCachedData = tsBlockIterator; + preCachedData = tsBlock; return true; } // calc from batch data - calcFromBatch(tsBlockIterator, curTimeRange); + calcFromBatch(tsBlock, curTimeRange); // judge whether the calculation finished if (isEndCalc() @@ -432,15 +412,12 @@ public class SeriesAggregateScanOperator implements DataSourceOperator { } private void calcFromStatistics(Statistics statistics) { - try { - for (AggregateResult result : aggregateResultList) { - if (result.hasFinalResult()) { - continue; - } - result.updateResultFromStatistics(statistics); + for (int i = 0; i < aggregators.size(); i++) { + Aggregator aggregator = aggregators.get(i); + if (aggregator.hasFinalResult()) { + continue; } - } catch (QueryProcessException e) { - throw new RuntimeException("Error while updating result using statistics", e); + aggregator.processStatistics(statistics); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java index cfa593146a..bee6e9f5fe 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java @@ -232,7 +232,7 @@ public class LocalExecutionPlanner { seriesPath, node.getAllSensors(), operatorContext, - node.getAggregateFuncList(), + null, node.getTimeFilter(), ascending, node.getGroupByTimeParameter()); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/InputLocation.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/InputLocation.java index fda0f4a957..97d7bb591a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/InputLocation.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/InputLocation.java @@ -24,9 +24,9 @@ import java.nio.ByteBuffer; import java.util.Objects; public class InputLocation { - // which input tsblock + // which input tsBlock private final int tsBlockIndex; - // which value column of that tsblock + // which value column of that tsBlock private final int valueColumnIndex; public InputLocation(int tsBlockIndex, int valueColumnIndex) { diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java index 09d56b7211..bfaced0670 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java @@ -30,6 +30,7 @@ import org.apache.iotdb.db.mpp.common.QueryId; import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext; import org.apache.iotdb.db.mpp.execution.FragmentInstanceState; import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine; +import org.apache.iotdb.db.mpp.operator.aggregation.Aggregator; import org.apache.iotdb.db.mpp.operator.source.SeriesAggregateScanOperator; import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; @@ -346,7 +347,7 @@ public class SeriesAggregateScanOperatorTest { } public SeriesAggregateScanOperator initSeriesAggregateScanOperator( - List<AggregationType> aggregateFuncList, + List<Aggregator> aggregators, Filter timeFilter, boolean ascending, GroupByTimeParameter groupByTimeParameter) @@ -373,7 +374,7 @@ public class SeriesAggregateScanOperatorTest { measurementPath, allSensors, fragmentInstanceContext.getOperatorContexts().get(0), - aggregateFuncList, + aggregators, timeFilter, ascending, groupByTimeParameter); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java index f6d4b68c4d..008db2dde3 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java @@ -92,12 +92,6 @@ public class TsBlock { } } - public boolean hasNext() { - return false; - } - - public void next() {} - public int getPositionCount() { return positionCount; } @@ -170,6 +164,23 @@ public class TsBlock { return wrapBlocksWithoutCopy(positionCount, timeColumn, newBlocks); } + /** + * This method will create a temporary view of origin tsBlock, which will reuse the arrays of + * columns but with different offset. It can be used where you want to skip some points when + * getting iterator. + */ + public TsBlock subTsBlock(int fromIndex) { + if (fromIndex > positionCount) { + throw new IllegalArgumentException("FromIndex of subTsBlock cannot over positionCount."); + } + TimeColumn subTimeColumn = (TimeColumn) timeColumn.subColumn(fromIndex); + Column[] subValueColumns = new Column[valueColumns.length]; + for (int i = 0; i < subValueColumns.length; i++) { + subValueColumns[i] = valueColumns[i].subColumn(fromIndex); + } + return new TsBlock(subTimeColumn, subValueColumns); + } + public long getTimeByIndex(int index) { return timeColumn.getLong(index); } @@ -186,6 +197,21 @@ public class TsBlock { return valueColumns[columnIndex]; } + public Column[] getTimeAndValueColumn(int columnIndex) { + Column[] columns = new Column[2]; + columns[0] = getTimeColumn(); + columns[1] = getColumn(columnIndex); + return columns; + } + + public Column[] getColumns(int[] columnIndexes) { + Column[] columns = new Column[columnIndexes.length]; + for (int i = 0; i < columnIndexes.length; i++) { + columns[i] = valueColumns[columnIndexes[i]]; + } + return columns; + } + public TsBlockSingleColumnIterator getTsBlockSingleColumnIterator() { return new TsBlockSingleColumnIterator(0); } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java index 4d7a888394..f9a5497992 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java @@ -123,6 +123,15 @@ public class BinaryColumn implements Column { return new BinaryColumn(positionOffset + arrayOffset, length, valueIsNull, values); } + @Override + public Column subColumn(int fromIndex) { + if (fromIndex > positionCount) { + throw new IllegalArgumentException("fromIndex is not valid"); + } + return new BinaryColumn( + arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values); + } + private void checkReadablePosition(int position) { if (position < 0 || position >= getPositionCount()) { throw new IllegalArgumentException("position is not valid"); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java index 218ce1baf8..1166ccfa4c 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java @@ -122,6 +122,15 @@ public class BooleanColumn implements Column { return new BooleanColumn(positionOffset + arrayOffset, length, valueIsNull, values); } + @Override + public Column subColumn(int fromIndex) { + if (fromIndex > positionCount) { + throw new IllegalArgumentException("fromIndex is not valid"); + } + return new BooleanColumn( + arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values); + } + private void checkReadablePosition(int position) { if (position < 0 || position >= getPositionCount()) { throw new IllegalArgumentException("position is not valid"); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java index ef9fb7d637..446b6a83fe 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java @@ -102,4 +102,10 @@ public interface Column { * also be released. If the region column is released, this block may also be released. */ Column getRegion(int positionOffset, int length); + + /** + * This method will create a temporary view of origin column, which will reuse the array of column + * but with different array offset. + */ + Column subColumn(int fromIndex); } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java index 13faf135fb..1da44fe212 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java @@ -122,6 +122,15 @@ public class DoubleColumn implements Column { return new DoubleColumn(positionOffset + arrayOffset, length, valueIsNull, values); } + @Override + public Column subColumn(int fromIndex) { + if (fromIndex > positionCount) { + throw new IllegalArgumentException("fromIndex is not valid"); + } + return new DoubleColumn( + arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values); + } + private void checkReadablePosition(int position) { if (position < 0 || position >= getPositionCount()) { throw new IllegalArgumentException("position is not valid"); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java index 08762164fb..49d73c3156 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java @@ -121,6 +121,14 @@ public class FloatColumn implements Column { return new FloatColumn(positionOffset + arrayOffset, length, valueIsNull, values); } + @Override + public Column subColumn(int fromIndex) { + if (fromIndex > positionCount) { + throw new IllegalArgumentException("fromIndex is not valid"); + } + return new FloatColumn(arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values); + } + private void checkReadablePosition(int position) { if (position < 0 || position >= getPositionCount()) { throw new IllegalArgumentException("position is not valid"); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java index 7e8d67f1b3..cfad52184d 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java @@ -121,6 +121,14 @@ public class IntColumn implements Column { return new IntColumn(positionOffset + arrayOffset, length, valueIsNull, values); } + @Override + public Column subColumn(int fromIndex) { + if (fromIndex > positionCount) { + throw new IllegalArgumentException("fromIndex is not valid"); + } + return new IntColumn(arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values); + } + private void checkReadablePosition(int position) { if (position < 0 || position >= getPositionCount()) { throw new IllegalArgumentException("position is not valid"); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java index a786918af8..9b89a09233 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java @@ -121,6 +121,14 @@ public class LongColumn implements Column { return new LongColumn(positionOffset + arrayOffset, length, valueIsNull, values); } + @Override + public Column subColumn(int fromIndex) { + if (fromIndex > positionCount) { + throw new IllegalArgumentException("fromIndex is not valid"); + } + return new LongColumn(arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values); + } + private void checkReadablePosition(int position) { if (position < 0 || position >= getPositionCount()) { throw new IllegalArgumentException("position is not valid"); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java index 283c374a99..c55d2e5686 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java @@ -147,6 +147,14 @@ public class RunLengthEncodedColumn implements Column { return new RunLengthEncodedColumn(value, length); } + @Override + public Column subColumn(int fromIndex) { + if (fromIndex > positionCount) { + throw new IllegalArgumentException("fromIndex is not valid"); + } + return new RunLengthEncodedColumn(value, positionCount - fromIndex); + } + private void checkReadablePosition(int position) { if (position < 0 || position >= positionCount) { throw new IllegalArgumentException("position is not valid"); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java index d8b44fd384..87164cf6d7 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java @@ -104,6 +104,14 @@ public class TimeColumn implements Column { return new TimeColumn(positionOffset + arrayOffset, length, values); } + @Override + public Column subColumn(int fromIndex) { + if (fromIndex > positionCount) { + throw new IllegalArgumentException("fromIndex is not valid"); + } + return new TimeColumn(arrayOffset + fromIndex, positionCount - fromIndex, values); + } + public long getStartTime() { return values[arrayOffset]; } diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/TsBlockTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/TsBlockTest.java index 75d28596e9..669d3a41ad 100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/TsBlockTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/TsBlockTest.java @@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.common.block; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator; import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn; import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn; @@ -30,6 +31,7 @@ import org.apache.iotdb.tsfile.read.common.block.column.LongColumn; import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn; import org.apache.iotdb.tsfile.utils.Binary; +import org.junit.Assert; import org.junit.Test; import java.util.Arrays; @@ -322,4 +324,51 @@ public class TsBlockTest { } } } + + @Test + public void testSubTsBlock() { + TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32)); + for (int i = 0; i < 10; i++) { + builder.getTimeColumnBuilder().writeLong(i); + builder.getColumnBuilder(0).writeInt(i); + builder.declarePosition(); + } + TsBlock tsBlock = builder.build(); + TsBlockSingleColumnIterator iterator = tsBlock.getTsBlockSingleColumnIterator(); + int index = 0; + while (iterator.hasNext()) { + Assert.assertEquals(index, iterator.currentTime()); + Assert.assertEquals(index, iterator.currentValue()); + iterator.next(); + index++; + } + // get subTsBlock from TsBlock, offset = 3 + int offset = 3; + TsBlock subTsBlock = tsBlock.subTsBlock(offset); + iterator = subTsBlock.getTsBlockSingleColumnIterator(); + index = offset; + while (iterator.hasNext()) { + Assert.assertEquals(index, iterator.currentTime()); + Assert.assertEquals(index, iterator.currentValue()); + iterator.next(); + index++; + } + // get subSubTsBlock from subTsBlock, offset = 2 + int nextOffset = 2; + TsBlock subSubTsBlock = subTsBlock.subTsBlock(nextOffset); + iterator = subSubTsBlock.getTsBlockSingleColumnIterator(); + index = offset + nextOffset; + while (iterator.hasNext()) { + Assert.assertEquals(index, iterator.currentTime()); + Assert.assertEquals(index, iterator.currentValue()); + iterator.next(); + index++; + } + try { + subSubTsBlock.subTsBlock(3); + } catch (IllegalArgumentException e) { + Assert.assertTrue( + e.getMessage().contains("FromIndex of subTsBlock cannot over positionCount.")); + } + } } diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/common/ColumnTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/common/ColumnTest.java new file mode 100644 index 0000000000..f21df099f7 --- /dev/null +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/common/ColumnTest.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tsfile.read.common; + +import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn; +import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumnBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn; +import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumnBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn; +import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn; +import org.apache.iotdb.tsfile.read.common.block.column.FloatColumnBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.IntColumn; +import org.apache.iotdb.tsfile.read.common.block.column.IntColumnBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.LongColumn; +import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn; +import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; +import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; +import org.apache.iotdb.tsfile.utils.Binary; + +import org.junit.Assert; +import org.junit.Test; + +public class ColumnTest { + + @Test + public void timeColumnSubColumnTest() { + TimeColumnBuilder columnBuilder = new TimeColumnBuilder(null, 10); + for (int i = 0; i < 10; i++) { + columnBuilder.writeLong(i); + } + TimeColumn timeColumn = (TimeColumn) columnBuilder.build(); + timeColumn = (TimeColumn) timeColumn.subColumn(5); + Assert.assertEquals(5, timeColumn.getPositionCount()); + Assert.assertEquals(5, timeColumn.getLong(0)); + Assert.assertEquals(9, timeColumn.getLong(4)); + try { + timeColumn.getLong(5); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("position is not valid")); + } + timeColumn = (TimeColumn) timeColumn.subColumn(3); + Assert.assertEquals(2, timeColumn.getPositionCount()); + Assert.assertEquals(8, timeColumn.getLong(0)); + Assert.assertEquals(9, timeColumn.getLong(1)); + try { + timeColumn.getLong(2); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("position is not valid")); + } + try { + timeColumn.subColumn(3); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("fromIndex is not valid")); + } + } + + @Test + public void binaryColumnSubColumnTest() { + BinaryColumnBuilder columnBuilder = new BinaryColumnBuilder(null, 10); + for (int i = 0; i < 10; i++) { + columnBuilder.writeBinary(Binary.valueOf(String.valueOf(i))); + } + BinaryColumn binaryColumn = (BinaryColumn) columnBuilder.build(); + binaryColumn = (BinaryColumn) binaryColumn.subColumn(5); + Assert.assertEquals(5, binaryColumn.getPositionCount()); + Assert.assertEquals("5", binaryColumn.getBinary(0).toString()); + Assert.assertEquals("9", binaryColumn.getBinary(4).toString()); + try { + binaryColumn.getBinary(5); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("position is not valid")); + } + binaryColumn = (BinaryColumn) binaryColumn.subColumn(3); + Assert.assertEquals(2, binaryColumn.getPositionCount()); + Assert.assertEquals("8", binaryColumn.getBinary(0).toString()); + Assert.assertEquals("9", binaryColumn.getBinary(1).toString()); + try { + binaryColumn.getBinary(2); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("position is not valid")); + } + try { + binaryColumn.subColumn(3); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("fromIndex is not valid")); + } + } + + @Test + public void booleanColumnSubColumnTest() { + BooleanColumnBuilder columnBuilder = new BooleanColumnBuilder(null, 10); + // 0: true, 1: false + for (int i = 0; i < 10; i++) { + columnBuilder.writeBoolean(i % 2 == 0); + } + BooleanColumn booleanColumn = (BooleanColumn) columnBuilder.build(); + booleanColumn = (BooleanColumn) booleanColumn.subColumn(5); + Assert.assertEquals(5, booleanColumn.getPositionCount()); + Assert.assertFalse(booleanColumn.getBoolean(0)); + Assert.assertFalse(booleanColumn.getBoolean(4)); + try { + booleanColumn.getBoolean(5); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("position is not valid")); + } + booleanColumn = (BooleanColumn) booleanColumn.subColumn(3); + Assert.assertEquals(2, booleanColumn.getPositionCount()); + Assert.assertTrue(booleanColumn.getBoolean(0)); + Assert.assertFalse(booleanColumn.getBoolean(1)); + try { + booleanColumn.getBoolean(2); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("position is not valid")); + } + try { + booleanColumn.subColumn(3); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("fromIndex is not valid")); + } + } + + @Test + public void doubleColumnSubColumnTest() { + DoubleColumnBuilder columnBuilder = new DoubleColumnBuilder(null, 10); + for (int i = 0; i < 10; i++) { + columnBuilder.writeDouble(i); + } + DoubleColumn doubleColumn = (DoubleColumn) columnBuilder.build(); + doubleColumn = (DoubleColumn) doubleColumn.subColumn(5); + Assert.assertEquals(5, doubleColumn.getPositionCount()); + Assert.assertEquals(5.0, doubleColumn.getDouble(0), 0.001); + Assert.assertEquals(9.0, doubleColumn.getDouble(4), 0.001); + try { + doubleColumn.getDouble(5); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("position is not valid")); + } + doubleColumn = (DoubleColumn) doubleColumn.subColumn(3); + Assert.assertEquals(2, doubleColumn.getPositionCount()); + Assert.assertEquals(8.0, doubleColumn.getDouble(0), 0.001); + Assert.assertEquals(9.0, doubleColumn.getDouble(1), 0.001); + try { + doubleColumn.getDouble(2); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("position is not valid")); + } + try { + doubleColumn.subColumn(3); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("fromIndex is not valid")); + } + } + + @Test + public void floatColumnSubColumnTest() { + FloatColumnBuilder columnBuilder = new FloatColumnBuilder(null, 10); + for (int i = 0; i < 10; i++) { + columnBuilder.writeFloat(i); + } + FloatColumn floatColumn = (FloatColumn) columnBuilder.build(); + floatColumn = (FloatColumn) floatColumn.subColumn(5); + Assert.assertEquals(5, floatColumn.getPositionCount()); + Assert.assertEquals(5.0, floatColumn.getFloat(0), 0.001); + Assert.assertEquals(9.0, floatColumn.getFloat(4), 0.001); + try { + floatColumn.getFloat(5); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("position is not valid")); + } + floatColumn = (FloatColumn) floatColumn.subColumn(3); + Assert.assertEquals(2, floatColumn.getPositionCount()); + Assert.assertEquals(8.0, floatColumn.getFloat(0), 0.001); + Assert.assertEquals(9.0, floatColumn.getFloat(1), 0.001); + try { + floatColumn.getFloat(2); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("position is not valid")); + } + try { + floatColumn.subColumn(3); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("fromIndex is not valid")); + } + } + + @Test + public void intColumnSubColumnTest() { + IntColumnBuilder columnBuilder = new IntColumnBuilder(null, 10); + for (int i = 0; i < 10; i++) { + columnBuilder.writeInt(i); + } + IntColumn intColumn = (IntColumn) columnBuilder.build(); + intColumn = (IntColumn) intColumn.subColumn(5); + Assert.assertEquals(5, intColumn.getPositionCount()); + Assert.assertEquals(5, intColumn.getInt(0)); + Assert.assertEquals(9, intColumn.getInt(4)); + try { + intColumn.getInt(5); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("position is not valid")); + } + intColumn = (IntColumn) intColumn.subColumn(3); + Assert.assertEquals(2, intColumn.getPositionCount()); + Assert.assertEquals(8, intColumn.getInt(0)); + Assert.assertEquals(9, intColumn.getInt(1)); + try { + intColumn.getInt(2); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("position is not valid")); + } + try { + intColumn.subColumn(3); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("fromIndex is not valid")); + } + } + + @Test + public void longColumnSubColumnTest() { + LongColumnBuilder columnBuilder = new LongColumnBuilder(null, 10); + for (int i = 0; i < 10; i++) { + columnBuilder.writeLong(i); + } + LongColumn longColumn = (LongColumn) columnBuilder.build(); + longColumn = (LongColumn) longColumn.subColumn(5); + Assert.assertEquals(5, longColumn.getPositionCount()); + Assert.assertEquals(5, longColumn.getLong(0)); + Assert.assertEquals(9, longColumn.getLong(4)); + try { + longColumn.getLong(5); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("position is not valid")); + } + longColumn = (LongColumn) longColumn.subColumn(3); + Assert.assertEquals(2, longColumn.getPositionCount()); + Assert.assertEquals(8, longColumn.getLong(0)); + Assert.assertEquals(9, longColumn.getLong(1)); + try { + longColumn.getLong(2); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("position is not valid")); + } + try { + longColumn.subColumn(3); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("fromIndex is not valid")); + } + } + + @Test + public void runLengthEncodedColumnSubColumnTest() { + LongColumnBuilder longColumnBuilder = new LongColumnBuilder(null, 1); + longColumnBuilder.writeLong(1); + RunLengthEncodedColumn column = new RunLengthEncodedColumn(longColumnBuilder.build(), 10); + column = (RunLengthEncodedColumn) column.subColumn(5); + Assert.assertEquals(5, column.getPositionCount()); + Assert.assertEquals(1, column.getLong(0)); + Assert.assertEquals(1, column.getLong(4)); + try { + column.getLong(5); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("position is not valid")); + } + column = (RunLengthEncodedColumn) column.subColumn(3); + Assert.assertEquals(2, column.getPositionCount()); + Assert.assertEquals(1, column.getLong(0)); + Assert.assertEquals(1, column.getLong(1)); + try { + column.getLong(2); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("position is not valid")); + } + try { + column.subColumn(3); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("fromIndex is not valid")); + } + } +}
