This is an automated email from the ASF dual-hosted git repository. suyue pushed a commit to branch aggregate in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit cfa2b65f7cc9b6946d95aca21655437e14d20ddf Author: suyue <[email protected]> AuthorDate: Sun Mar 10 16:27:56 2019 +0800 realize aggregate function interface --- .../db/query/aggregation/AggreFuncFactory.java | 76 ++++++++++ .../db/query/aggregation/AggregateFunction.java | 130 +++++++++++++++++ .../db/query/aggregation/AggregationConstant.java | 40 ++++++ .../db/query/aggregation/impl/CountAggrFunc.java | 112 +++++++++++++++ .../db/query/aggregation/impl/FirstAggrFunc.java | 114 +++++++++++++++ .../db/query/aggregation/impl/LastAggrFunc.java | 135 ++++++++++++++++++ .../db/query/aggregation/impl/MaxTimeAggrFunc.java | 127 +++++++++++++++++ .../query/aggregation/impl/MaxValueAggrFunc.java | 137 ++++++++++++++++++ .../db/query/aggregation/impl/MeanAggrFunc.java | 133 +++++++++++++++++ .../db/query/aggregation/impl/MinTimeAggrFunc.java | 108 ++++++++++++++ .../query/aggregation/impl/MinValueAggrFunc.java | 138 ++++++++++++++++++ .../db/query/aggregation/impl/SumAggrFunc.java | 130 +++++++++++++++++ .../iotdb/db/query/dataset/AggregateDataSet.java | 138 ++++++++++++++++++ .../db/query/executor/AggregateEngineExecutor.java | 158 +++++++++++++++++++++ .../iotdb/db/query/executor/EngineQueryRouter.java | 35 +++++ .../apache/iotdb/tsfile/read/common/BatchData.java | 58 ++++++++ 16 files changed, 1769 insertions(+) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreFuncFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreFuncFactory.java new file mode 100644 index 0000000..882993d --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreFuncFactory.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.aggregation; + +import org.apache.iotdb.db.exception.ProcessorException; +import org.apache.iotdb.db.query.aggregation.impl.CountAggrFunc; +import org.apache.iotdb.db.query.aggregation.impl.FirstAggrFunc; +import org.apache.iotdb.db.query.aggregation.impl.LastAggrFunc; +import org.apache.iotdb.db.query.aggregation.impl.MaxTimeAggrFunc; +import org.apache.iotdb.db.query.aggregation.impl.MaxValueAggrFunc; +import org.apache.iotdb.db.query.aggregation.impl.MeanAggrFunc; +import org.apache.iotdb.db.query.aggregation.impl.MinTimeAggrFunc; +import org.apache.iotdb.db.query.aggregation.impl.MinValueAggrFunc; +import org.apache.iotdb.db.query.aggregation.impl.SumAggrFunc; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + +/** + * Easy factory pattern to build AggregateFunction. + */ +public class AggreFuncFactory { + + /** + * construct AggregateFunction using factory pattern. + * @param aggrFuncName function name. + * @param dataType data type. + * @return + * @throws ProcessorException + */ + public static AggregateFunction getAggrFuncByName(String aggrFuncName, TSDataType dataType) + throws ProcessorException { + if (aggrFuncName == null) { + throw new ProcessorException("AggregateFunction Name must not be null"); + } + + switch (aggrFuncName.toLowerCase()) { + case AggregationConstant.MIN_TIME: + return new MinTimeAggrFunc(); + case AggregationConstant.MAX_TIME: + return new MaxTimeAggrFunc(); + case AggregationConstant.MIN_VALUE: + return new MinValueAggrFunc(dataType); + case AggregationConstant.MAX_VALUE: + return new MaxValueAggrFunc(dataType); + case AggregationConstant.COUNT: + return new CountAggrFunc(); + case AggregationConstant.MEAN: + return new MeanAggrFunc(); + case AggregationConstant.FIRST: + return new FirstAggrFunc(dataType); + case AggregationConstant.SUM: + return new SumAggrFunc(); + case AggregationConstant.LAST: + return new LastAggrFunc(dataType); + default: + throw new ProcessorException( + "aggregate does not support " + aggrFuncName + " function."); + } + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java new file mode 100644 index 0000000..34d2d75 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.aggregation; + +import java.io.IOException; +import org.apache.iotdb.db.exception.ProcessorException; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; +import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator; +import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.utils.Binary; + +public abstract class AggregateFunction { + + public String name; + public BatchData resultData; + public TSDataType dataType; + public boolean hasSetValue; + + public AggregateFunction(String name, TSDataType dataType) { + this.name = name; + this.dataType = dataType; + this.hasSetValue = false; + resultData = new BatchData(dataType, true, true); + } + + public abstract void init(); + + public abstract BatchData getResult(); + + /** + * <p> + * Calculate the aggregation using <code>PageHeader</code>. + * </p> + * + * @param pageHeader <code>PageHeader</code> + */ + public abstract void calculateValueFromPageHeader(PageHeader pageHeader) + throws ProcessorException; + + /** + * <p> + * Could not calculate using <method>calculateValueFromPageHeader</method> directly. Calculate the + * aggregation according to all decompressed data in this page. + * </p> + * + * @param dataInThisPage the data in the DataPage + * @param unsequenceReader unsequence data reader + * @throws IOException TsFile data read exception + * @throws ProcessorException wrong aggregation method parameter + */ + public abstract void calculateValueFromPageData(BatchData dataInThisPage, + IPointReader unsequenceReader) throws IOException, ProcessorException; + + public abstract void calculateValueFromUnsequenceReader(IPointReader unsequenceReader) + throws IOException, ProcessorException; + + + /** + * <p> + * This method is calculate the aggregation using the common timestamps of cross series filter. + * </p> + * + * @throws IOException TsFile data read error + * @throws ProcessorException wrong aggregation method parameter + */ + public abstract boolean calcAggregationUsingTimestamps(EngineTimeGenerator timeGenerator, + EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp unsequenceReader) + throws IOException, ProcessorException; + + /** + * <p> + * This method is calculate the group by function. + * </p> + */ + public abstract void calcGroupByAggregation(long partitionStart, long partitionEnd, + long intervalStart, long intervalEnd, + BatchData data) throws ProcessorException; + + /** + * Convert a value from string to its real data type and put into return data. + */ + public void putValueFromStr(String valueStr) throws ProcessorException { + try { + switch (dataType) { + case INT32: + resultData.putInt(Integer.parseInt(valueStr)); + break; + case INT64: + resultData.putLong(Long.parseLong(valueStr)); + break; + case BOOLEAN: + resultData.putBoolean(Boolean.parseBoolean(valueStr)); + break; + case TEXT: + resultData.putBinary(new Binary(valueStr)); + break; + case DOUBLE: + resultData.putDouble(Double.parseDouble(valueStr)); + break; + case FLOAT: + resultData.putFloat(Float.parseFloat(valueStr)); + break; + default: + throw new ProcessorException("Unsupported type " + dataType); + } + } catch (Exception e) { + throw new ProcessorException(e.getMessage()); + } + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationConstant.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationConstant.java new file mode 100644 index 0000000..848d9c7 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationConstant.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.aggregation; + +import org.apache.iotdb.tsfile.common.constant.StatisticConstant; + +public class AggregationConstant { + + public static final String MIN_TIME = StatisticConstant.MIN_TIME; + public static final String MAX_TIME = StatisticConstant.MAX_TIME; + + public static final String MAX_VALUE = StatisticConstant.MAX_VALUE; + public static final String MIN_VALUE = StatisticConstant.MIN_VALUE; + + public static final String COUNT = StatisticConstant.COUNT; + + public static final String FIRST = StatisticConstant.FIRST; + public static final String LAST = StatisticConstant.LAST; + + public static final String MEAN = StatisticConstant.MEAN; + public static final String SUM = StatisticConstant.SUM; + +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java new file mode 100644 index 0000000..b5cbc27 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.aggregation.impl; + +import java.io.IOException; +import org.apache.iotdb.db.exception.ProcessorException; +import org.apache.iotdb.db.query.aggregation.AggregateFunction; +import org.apache.iotdb.db.query.aggregation.AggregationConstant; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; +import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator; +import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.BatchData; + +public class CountAggrFunc extends AggregateFunction { + + public CountAggrFunc() { + super(AggregationConstant.COUNT, TSDataType.INT64); + } + + @Override + public void init() { + if (resultData.length() == 0) { + resultData.putTime(0); + resultData.putLong(0); + } + } + + @Override + public BatchData getResult() { + return resultData; + } + + @Override + public void calculateValueFromPageHeader(PageHeader pageHeader) { + System.out.println("PageHeader>>>>>>>>>>>>" + pageHeader.getNumOfValues() + " " + pageHeader + .getMinTimestamp() + + "," + pageHeader.getMaxTimestamp()); + long preValue = resultData.getLong(); + preValue += pageHeader.getNumOfValues(); + resultData.setLong(0, preValue); + + } + + @Override + public void calculateValueFromPageData(BatchData dataInThisPage, IPointReader unsequenceReader) + throws IOException, ProcessorException { + while (dataInThisPage.hasNext() && unsequenceReader.hasNext()) { + if (dataInThisPage.currentTime() == unsequenceReader.current().getTimestamp()) { + dataInThisPage.next(); + unsequenceReader.next(); + } else if (dataInThisPage.currentTime() < unsequenceReader.current().getTimestamp()) { + dataInThisPage.next(); + } else { + unsequenceReader.next(); + } + long preValue = resultData.getLong(); + preValue += 1; + resultData.setLong(0, preValue); + } + + if (dataInThisPage.hasNext()) { + long preValue = resultData.getLong(); + preValue += (resultData.length() - resultData.getCurIdx()); + resultData.setLong(0, preValue); + } + } + + @Override + public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader) + throws IOException, ProcessorException { + int cnt = 0; + while (unsequenceReader.hasNext()) { + unsequenceReader.next(); + cnt++; + } + long preValue = resultData.getLong(); + preValue += cnt; + resultData.setLong(0, preValue); + } + + @Override + public boolean calcAggregationUsingTimestamps(EngineTimeGenerator timeGenerator, + EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp unsequenceReader) + throws IOException, ProcessorException { + return false; + } + + @Override + public void calcGroupByAggregation(long partitionStart, long partitionEnd, long intervalStart, + long intervalEnd, BatchData data) throws ProcessorException { + + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java new file mode 100644 index 0000000..96823ff --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.aggregation.impl; + +import java.io.IOException; +import org.apache.iotdb.db.exception.ProcessorException; +import org.apache.iotdb.db.query.aggregation.AggregateFunction; +import org.apache.iotdb.db.query.aggregation.AggregationConstant; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; +import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator; +import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.BatchData; + +public class FirstAggrFunc extends AggregateFunction { + + public FirstAggrFunc(TSDataType dataType) { + super(AggregationConstant.FIRST, dataType); + } + + @Override + public void init() { + + } + + @Override + public BatchData getResult() { + return resultData; + } + + @Override + public void calculateValueFromPageHeader(PageHeader pageHeader) throws ProcessorException { + if (resultData.length() != 0) { + return; + } + + Object firstVal = pageHeader.getStatistics().getFirst(); + if (firstVal == null) { + throw new ProcessorException("PageHeader contains no FIRST value"); + } + resultData.setTime(0, 0); + resultData.putAnObject(firstVal); + } + + @Override + public void calculateValueFromPageData(BatchData dataInThisPage, IPointReader unsequenceReader) + throws IOException, ProcessorException { + if (resultData.length() != 0) { + return; + } + if (dataInThisPage.hasNext() && unsequenceReader.hasNext()) { + if (dataInThisPage.currentTime() >= unsequenceReader.current().getTimestamp()) { + resultData.setTime(0, 0); + resultData.putAnObject(unsequenceReader.current().getValue().getValue()); + unsequenceReader.next(); + return; + } else { + resultData.setTime(0, 0); + resultData.putAnObject(dataInThisPage.currentValue()); + return; + } + } + + if (dataInThisPage.hasNext()) { + resultData.setTime(0, 0); + resultData.putAnObject(dataInThisPage.currentValue()); + return; + } + } + + @Override + public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader) + throws IOException, ProcessorException { + if (resultData.length() != 0) { + return; + } + if (unsequenceReader.hasNext()) { + resultData.setTime(0, 0); + resultData.putAnObject(unsequenceReader.current().getValue().getValue()); + return; + } + } + + @Override + public boolean calcAggregationUsingTimestamps(EngineTimeGenerator timeGenerator, + EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp unsequenceReader) + throws IOException, ProcessorException { + return false; + } + + @Override + public void calcGroupByAggregation(long partitionStart, long partitionEnd, long intervalStart, + long intervalEnd, BatchData data) throws ProcessorException { + + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java new file mode 100644 index 0000000..def4c6a --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.aggregation.impl; + +import java.io.IOException; +import org.apache.iotdb.db.exception.ProcessorException; +import org.apache.iotdb.db.query.aggregation.AggregateFunction; +import org.apache.iotdb.db.query.aggregation.AggregationConstant; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; +import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator; +import org.apache.iotdb.db.utils.TimeValuePair; +import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.BatchData; + +public class LastAggrFunc extends AggregateFunction { + + public LastAggrFunc(TSDataType dataType) { + super(AggregationConstant.LAST, dataType); + } + + @Override + public void init() { + + } + + @Override + public BatchData getResult() { + if (resultData.length() != 0) { + resultData.setTime(0, 0); + } + return resultData; + } + + @Override + public void calculateValueFromPageHeader(PageHeader pageHeader) throws ProcessorException { + Object lastVal = pageHeader.getStatistics().getLast(); + if (resultData.length() == 0) { + resultData.putTime(pageHeader.getMaxTimestamp()); + resultData.putAnObject(lastVal); + } else { + if (resultData.currentTime() < pageHeader.getMaxTimestamp()) { + resultData.setAnObject(0, (Comparable<?>) lastVal); + } + } + } + + @Override + public void calculateValueFromPageData(BatchData dataInThisPage, IPointReader unsequenceReader) + throws IOException, ProcessorException { + long time = -1; + Object lastVal = null; + int maxIndex = dataInThisPage.length() - 1; + if (maxIndex < 0) { + return; + } + time = dataInThisPage.getTimeByIndex(maxIndex); + lastVal = dataInThisPage.getValueByIndex(maxIndex); + while (unsequenceReader.hasNext()) { + if (unsequenceReader.current().getTimestamp() < time) { + unsequenceReader.next(); + } else if (unsequenceReader.current().getTimestamp() == time) { + lastVal = unsequenceReader.current().getValue().getValue(); + unsequenceReader.next(); + } else { + break; + } + } + + if (resultData.length() == 0) { + if (time != -1) { + resultData.setTime(0, time); + resultData.setAnObject(0, (Comparable<?>) lastVal); + } + } else { + //has set value + if (time != -1 && time > resultData.currentTime()) { + resultData.setTime(0, time); + resultData.setAnObject(0, (Comparable<?>) lastVal); + } + } + } + + @Override + public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader) + throws IOException, ProcessorException { + TimeValuePair pair = null; + while (unsequenceReader.hasNext()) { + pair = unsequenceReader.next(); + } + if (resultData.length() == 0) { + if (pair != null) { + resultData.setAnObject(0, (Comparable<?>) pair.getValue().getValue()); + resultData.setTime(0, pair.getTimestamp()); + } + } else { + if (pair != null && pair.getTimestamp() >= resultData.currentTime()) { + resultData.setAnObject(0, (Comparable<?>) pair.getValue().getValue()); + resultData.setTime(0, pair.getTimestamp()); + } + } + + } + + @Override + public boolean calcAggregationUsingTimestamps(EngineTimeGenerator timeGenerator, + EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp unsequenceReader) + throws IOException, ProcessorException { + return false; + } + + @Override + public void calcGroupByAggregation(long partitionStart, long partitionEnd, long intervalStart, + long intervalEnd, BatchData data) throws ProcessorException { + + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java new file mode 100644 index 0000000..8b4adaf --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.aggregation.impl; + +import java.io.IOException; +import org.apache.iotdb.db.exception.ProcessorException; +import org.apache.iotdb.db.query.aggregation.AggregateFunction; +import org.apache.iotdb.db.query.aggregation.AggregationConstant; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; +import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator; +import org.apache.iotdb.db.utils.TimeValuePair; +import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.BatchData; + +public class MaxTimeAggrFunc extends AggregateFunction { + + public MaxTimeAggrFunc() { + super(AggregationConstant.MAX_TIME, TSDataType.INT64); + } + + @Override + public void init() { + + } + + @Override + public BatchData getResult() { + return resultData; + } + + @Override + public void calculateValueFromPageHeader(PageHeader pageHeader) throws ProcessorException { + long maxTimestamp = pageHeader.getMaxTimestamp(); + + //has not set value + if (resultData.length() == 0) { + resultData.putTime(0); + resultData.putLong(maxTimestamp); + return; + } + + if (resultData.getLong() < maxTimestamp) { + resultData.setLong(0, maxTimestamp); + } + } + + @Override + public void calculateValueFromPageData(BatchData dataInThisPage, IPointReader unsequenceReader) + throws IOException, ProcessorException { + long time = -1; + int maxIndex = dataInThisPage.length() - 1; + if (maxIndex < 0) { + return; + } + time = dataInThisPage.getTimeByIndex(maxIndex); + while (unsequenceReader.hasNext()) { + if (unsequenceReader.current().getTimestamp() <= time) { + unsequenceReader.next(); + } else { + break; + } + } + if (resultData.length() == 0) { + if (time != -1) { + resultData.setTime(0, 0); + resultData.setAnObject(0, time); + } + } else { + //has set value + if (time != -1 && time > resultData.currentTime()) { + resultData.setAnObject(0, time); + } + } + } + + @Override + public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader) + throws IOException, ProcessorException { + TimeValuePair pair = null; + while (unsequenceReader.hasNext()) { + pair = unsequenceReader.next(); + } + if (resultData.length() == 0) { + if (pair != null) { + resultData.setTime(0, 0); + resultData.setAnObject(0, pair.getTimestamp()); + } + } else { + //has set value + if (pair != null && pair.getTimestamp() > resultData.currentTime()) { + resultData.setAnObject(0, pair.getTimestamp()); + } + } + } + + @Override + public boolean calcAggregationUsingTimestamps(EngineTimeGenerator timeGenerator, + EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp unsequenceReader) + throws IOException, ProcessorException { + return false; + } + + @Override + public void calcGroupByAggregation(long partitionStart, long partitionEnd, long intervalStart, + long intervalEnd, BatchData data) throws ProcessorException { + + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java new file mode 100644 index 0000000..ac3d901 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.aggregation.impl; + +import java.io.IOException; +import org.apache.iotdb.db.exception.ProcessorException; +import org.apache.iotdb.db.query.aggregation.AggregateFunction; +import org.apache.iotdb.db.query.aggregation.AggregationConstant; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; +import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator; +import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.BatchData; + +public class MaxValueAggrFunc extends AggregateFunction { + + public MaxValueAggrFunc(TSDataType dataType) { + super(AggregationConstant.MAX_VALUE, dataType); + } + + @Override + public void init() { + + } + + @Override + public BatchData getResult() { + return resultData; + } + + @Override + public void calculateValueFromPageHeader(PageHeader pageHeader) throws ProcessorException { + Comparable<Object> maxVal = (Comparable<Object>) pageHeader.getStatistics().getMax(); + if (resultData.length() == 0) { + resultData.putTime(0); + resultData.setAnObject(0, maxVal); + } else { + if (maxVal.compareTo(resultData.currentValue()) > 0) { + resultData.setAnObject(0, maxVal); + } + } + } + + @Override + public void calculateValueFromPageData(BatchData dataInThisPage, IPointReader unsequenceReader) + throws IOException, ProcessorException { + Comparable<Object> maxVal = null; + while (dataInThisPage.hasNext() && unsequenceReader.hasNext()) { + if (dataInThisPage.currentTime() < unsequenceReader.current().getTimestamp()) { + if (maxVal == null || maxVal.compareTo(dataInThisPage.currentValue()) < 0) { + maxVal = (Comparable<Object>) dataInThisPage.currentValue(); + } + dataInThisPage.next(); + } else if (dataInThisPage.currentTime() == unsequenceReader.current().getTimestamp()) { + if (maxVal == null + || maxVal.compareTo(unsequenceReader.current().getValue().getValue()) < 0) { + maxVal = (Comparable<Object>) unsequenceReader.current().getValue().getValue(); + } + dataInThisPage.next(); + unsequenceReader.next(); + } else { + if (maxVal == null + || maxVal.compareTo(unsequenceReader.current().getValue().getValue()) < 0) { + maxVal = (Comparable<Object>) unsequenceReader.current().getValue().getValue(); + } + unsequenceReader.next(); + } + } + + while (dataInThisPage.hasNext()) { + if (maxVal == null + || maxVal.compareTo(unsequenceReader.current().getValue().getValue()) < 0) { + maxVal = (Comparable<Object>) unsequenceReader.current().getValue().getValue(); + } + dataInThisPage.next(); + } + updateResult(maxVal); + } + + @Override + public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader) + throws IOException, ProcessorException { + Comparable<Object> maxVal = null; + while (unsequenceReader.hasNext()) { + if (maxVal == null + || maxVal.compareTo(unsequenceReader.current().getValue().getValue()) < 0) { + maxVal = (Comparable<Object>) unsequenceReader.current().getValue().getValue(); + } + unsequenceReader.next(); + } + updateResult(maxVal); + } + + private void updateResult(Comparable<Object> maxVal) { + if (resultData.length() == 0) { + if (maxVal != null) { + resultData.putTime(0); + resultData.setAnObject(0, maxVal); + } + } else { + if (maxVal != null && maxVal.compareTo(resultData.currentValue()) > 0) { + resultData.setAnObject(0, maxVal); + } + } + } + + @Override + public boolean calcAggregationUsingTimestamps(EngineTimeGenerator timeGenerator, + EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp unsequenceReader) + throws IOException, ProcessorException { + return false; + } + + @Override + public void calcGroupByAggregation(long partitionStart, long partitionEnd, long intervalStart, + long intervalEnd, BatchData data) throws ProcessorException { + + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java new file mode 100644 index 0000000..9e56fdc --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.query.aggregation.impl; + +import java.io.IOException; +import org.apache.iotdb.db.exception.ProcessorException; +import org.apache.iotdb.db.query.aggregation.AggregateFunction; +import org.apache.iotdb.db.query.aggregation.AggregationConstant; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; +import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator; +import org.apache.iotdb.db.utils.TimeValuePair; +import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.BatchData; + +public class MeanAggrFunc extends AggregateFunction { + private double sum = 0.0; + private int cnt = 0; + + public MeanAggrFunc() { + super(AggregationConstant.MEAN, TSDataType.DOUBLE); + } + + @Override + public void init() { + } + + @Override + public BatchData getResult() { + if (cnt > 0) { + resultData.putTime(0); + resultData.setDouble(0, sum / cnt); + } + return resultData; + } + + @Override + public void calculateValueFromPageHeader(PageHeader pageHeader) throws ProcessorException { + sum += pageHeader.getStatistics().getSum(); + cnt += pageHeader.getNumOfValues(); + } + + @Override + public void calculateValueFromPageData(BatchData dataInThisPage, IPointReader unsequenceReader) + throws IOException, ProcessorException { + TSDataType type = dataInThisPage.getDataType(); + while (dataInThisPage.hasNext() && unsequenceReader.hasNext()) { + Object sumVal = null; + if (dataInThisPage.currentTime() < unsequenceReader.current().getTimestamp()) { + sumVal = dataInThisPage.currentValue(); + dataInThisPage.next(); + } else if (dataInThisPage.currentTime() == unsequenceReader.current().getTimestamp()) { + sumVal = unsequenceReader.current().getValue().getValue(); + dataInThisPage.next(); + unsequenceReader.next(); + } else { + sumVal = unsequenceReader.current().getValue().getValue(); + unsequenceReader.next(); + } + updateMean(type, sumVal); + } + + while (dataInThisPage.hasNext()) { + updateMean(type, dataInThisPage.currentValue()); + dataInThisPage.next(); + } + } + + private void updateMean(TSDataType type, Object sumVal) throws ProcessorException { + switch (type) { + case INT32: + sum += (int) sumVal; + break; + case INT64: + sum += (long) sumVal; + break; + case FLOAT: + sum += (float) sumVal; + break; + case DOUBLE: + sum += (double) sumVal; + break; + case TEXT: + case BOOLEAN: + default: + throw new ProcessorException("Unsupported data type in aggregation MEAN : " + type); + } + cnt++; + } + + @Override + public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader) + throws IOException, ProcessorException { + TSDataType type = null; + if (unsequenceReader.hasNext()) { + type = unsequenceReader.current().getValue().getDataType(); + } + while (unsequenceReader.hasNext()) { + TimeValuePair pair = unsequenceReader.next(); + updateMean(type, pair.getValue().getValue()); + } + } + + @Override + public boolean calcAggregationUsingTimestamps(EngineTimeGenerator timeGenerator, + EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp unsequenceReader) + throws IOException, ProcessorException { + return false; + } + + @Override + public void calcGroupByAggregation(long partitionStart, long partitionEnd, long intervalStart, + long intervalEnd, BatchData data) throws ProcessorException { + + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java new file mode 100644 index 0000000..4dca740 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.aggregation.impl; + +import java.io.IOException; +import org.apache.iotdb.db.exception.ProcessorException; +import org.apache.iotdb.db.query.aggregation.AggregateFunction; +import org.apache.iotdb.db.query.aggregation.AggregationConstant; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; +import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator; +import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.BatchData; + +public class MinTimeAggrFunc extends AggregateFunction { + + public MinTimeAggrFunc() { + super(AggregationConstant.MIN_TIME, TSDataType.INT64); + } + + @Override + public void init() { + + } + + @Override + public BatchData getResult() { + return resultData; + } + + @Override + public void calculateValueFromPageHeader(PageHeader pageHeader) throws ProcessorException { + if (resultData.length() > 0) { + return; + } + long time = pageHeader.getMinTimestamp(); + resultData.putTime(0); + resultData.putLong(time); + } + + @Override + public void calculateValueFromPageData(BatchData dataInThisPage, IPointReader unsequenceReader) + throws IOException, ProcessorException { + if (resultData.length() > 0) { + return; + } + + if (dataInThisPage.hasNext() && unsequenceReader.hasNext()) { + if (dataInThisPage.currentTime() < unsequenceReader.current().getTimestamp()) { + resultData.putTime(0); + resultData.putLong(dataInThisPage.currentTime()); + } else { + resultData.putTime(0); + resultData.putLong(unsequenceReader.current().getTimestamp()); + } + return; + } + + if (dataInThisPage.hasNext()) { + resultData.putTime(0); + resultData.putLong(dataInThisPage.currentTime()); + } + } + + @Override + public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader) + throws IOException, ProcessorException { + if (resultData.length() > 0) { + return; + } + if (unsequenceReader.hasNext()) { + resultData.putTime(0); + resultData.putLong(unsequenceReader.current().getTimestamp()); + } + } + + @Override + public boolean calcAggregationUsingTimestamps(EngineTimeGenerator timeGenerator, + EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp unsequenceReader) + throws IOException, ProcessorException { + return false; + } + + @Override + public void calcGroupByAggregation(long partitionStart, long partitionEnd, long intervalStart, + long intervalEnd, BatchData data) throws ProcessorException { + + } + +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java new file mode 100644 index 0000000..e60ab45 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.aggregation.impl; + +import java.io.IOException; +import org.apache.iotdb.db.exception.ProcessorException; +import org.apache.iotdb.db.query.aggregation.AggregateFunction; +import org.apache.iotdb.db.query.aggregation.AggregationConstant; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; +import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator; +import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.BatchData; + +public class MinValueAggrFunc extends AggregateFunction { + + public MinValueAggrFunc(TSDataType dataType) { + super(AggregationConstant.MIN_VALUE, dataType); + } + + @Override + public void init() { + + } + + @Override + public BatchData getResult() { + return resultData; + } + + @Override + public void calculateValueFromPageHeader(PageHeader pageHeader) throws ProcessorException { + Comparable<Object> minVal = (Comparable<Object>) pageHeader.getStatistics().getMin(); + if (resultData.length() == 0) { + resultData.putTime(0); + resultData.setAnObject(0, minVal); + } else { + if (minVal.compareTo(resultData.currentValue()) < 0) { + resultData.setAnObject(0, minVal); + } + } + } + + @Override + public void calculateValueFromPageData(BatchData dataInThisPage, IPointReader unsequenceReader) + throws IOException, ProcessorException { + Comparable<Object> minVal = null; + while (dataInThisPage.hasNext() && unsequenceReader.hasNext()) { + if (dataInThisPage.currentTime() < unsequenceReader.current().getTimestamp()) { + if (minVal == null || minVal.compareTo(dataInThisPage.currentValue()) > 0) { + minVal = (Comparable<Object>) dataInThisPage.currentValue(); + } + dataInThisPage.next(); + } else if (dataInThisPage.currentTime() == unsequenceReader.current().getTimestamp()) { + if (minVal == null + || minVal.compareTo(unsequenceReader.current().getValue().getValue()) > 0) { + minVal = (Comparable<Object>) unsequenceReader.current().getValue().getValue(); + } + dataInThisPage.next(); + unsequenceReader.next(); + } else { + if (minVal == null + || minVal.compareTo(unsequenceReader.current().getValue().getValue()) > 0) { + minVal = (Comparable<Object>) unsequenceReader.current().getValue().getValue(); + } + unsequenceReader.next(); + } + } + + while (dataInThisPage.hasNext()) { + if (minVal == null + || minVal.compareTo(unsequenceReader.current().getValue().getValue()) > 0) { + minVal = (Comparable<Object>) unsequenceReader.current().getValue().getValue(); + } + dataInThisPage.next(); + } + updateResult(minVal); + } + + @Override + public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader) + throws IOException, ProcessorException { + Comparable<Object> minVal = null; + while (unsequenceReader.hasNext()) { + if (minVal == null + || minVal.compareTo(unsequenceReader.current().getValue().getValue()) > 0) { + minVal = (Comparable<Object>) unsequenceReader.current().getValue().getValue(); + } + unsequenceReader.next(); + } + updateResult(minVal); + } + + private void updateResult(Comparable<Object> minVal) { + if (resultData.length() == 0) { + if (minVal != null) { + resultData.putTime(0); + resultData.setAnObject(0, minVal); + } + } else { + if (minVal != null && minVal.compareTo(resultData.currentValue()) < 0) { + resultData.setAnObject(0, minVal); + } + } + } + + @Override + public boolean calcAggregationUsingTimestamps(EngineTimeGenerator timeGenerator, + EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp unsequenceReader) + throws IOException, ProcessorException { + return false; + } + + @Override + public void calcGroupByAggregation(long partitionStart, long partitionEnd, long intervalStart, + long intervalEnd, BatchData data) throws ProcessorException { + + } + +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrFunc.java b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrFunc.java new file mode 100644 index 0000000..642d2a7 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrFunc.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.aggregation.impl; + +import java.io.IOException; +import org.apache.iotdb.db.exception.ProcessorException; +import org.apache.iotdb.db.query.aggregation.AggregateFunction; +import org.apache.iotdb.db.query.aggregation.AggregationConstant; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; +import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator; +import org.apache.iotdb.db.utils.TimeValuePair; +import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.BatchData; + +public class SumAggrFunc extends AggregateFunction { + + private double sum = 0.0; + + public SumAggrFunc() { + super(AggregationConstant.SUM, TSDataType.DOUBLE); + } + + @Override + public void init() { + + } + + @Override + public BatchData getResult() { + resultData.setDouble(0, sum); + return resultData; + } + + @Override + public void calculateValueFromPageHeader(PageHeader pageHeader) throws ProcessorException { + sum += pageHeader.getStatistics().getSum(); + } + + @Override + public void calculateValueFromPageData(BatchData dataInThisPage, IPointReader unsequenceReader) + throws IOException, ProcessorException { + TSDataType type = dataInThisPage.getDataType(); + while (dataInThisPage.hasNext() && unsequenceReader.hasNext()) { + Object sumVal = null; + if (dataInThisPage.currentTime() < unsequenceReader.current().getTimestamp()) { + sumVal = dataInThisPage.currentValue(); + dataInThisPage.next(); + } else if (dataInThisPage.currentTime() == unsequenceReader.current().getTimestamp()) { + sumVal = unsequenceReader.current().getValue().getValue(); + dataInThisPage.next(); + unsequenceReader.next(); + } else { + sumVal = unsequenceReader.current().getValue().getValue(); + unsequenceReader.next(); + } + updateSum(type, sumVal); + } + + while (dataInThisPage.hasNext()) { + updateSum(type, dataInThisPage.currentValue()); + dataInThisPage.next(); + } + } + + private void updateSum(TSDataType type, Object sumVal) throws ProcessorException { + switch (type) { + case INT32: + sum += (int) sumVal; + break; + case INT64: + sum += (long) sumVal; + break; + case FLOAT: + sum += (float) sumVal; + break; + case DOUBLE: + sum += (double) sumVal; + break; + case TEXT: + case BOOLEAN: + default: + throw new ProcessorException("Unsupported data type in aggregation MEAN : " + type); + } + } + + @Override + public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader) + throws IOException, ProcessorException { + TSDataType type = null; + if (unsequenceReader.hasNext()) { + type = unsequenceReader.current().getValue().getDataType(); + } + while (unsequenceReader.hasNext()) { + TimeValuePair pair = unsequenceReader.next(); + updateSum(type, pair.getValue().getValue()); + } + } + + @Override + public boolean calcAggregationUsingTimestamps(EngineTimeGenerator timeGenerator, + EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp unsequenceReader) + throws IOException, ProcessorException { + return false; + } + + @Override + public void calcGroupByAggregation(long partitionStart, long partitionEnd, long intervalStart, + long intervalEnd, BatchData data) throws ProcessorException { + + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AggregateDataSet.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AggregateDataSet.java new file mode 100644 index 0000000..00fe608 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AggregateDataSet.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.dataset; + +import java.io.IOException; +import java.util.List; +import java.util.PriorityQueue; +import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Field; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; + +public class AggregateDataSet extends QueryDataSet { + private List<BatchData> readers; + + private PriorityQueue<Long> timeHeap; + + private List<String> aggres; + + /** + * constructor of EngineDataSetWithoutTimeGenerator. + * + * @param paths paths in List structure + * @param aggres aggregate function name + * @param dataTypes time series data type + * @param readers readers in List(IReader) structure + * @throws IOException IOException + */ + public AggregateDataSet(List<Path> paths, List<String> aggres, List<TSDataType> dataTypes, + List<BatchData> readers) + throws IOException { + super(paths, dataTypes); + this.readers = readers; + initHeap(); + } + + private void initHeap(){ + timeHeap = new PriorityQueue<>(); + + for (int i = 0; i < readers.size(); i++) { + BatchData reader = readers.get(i); + if (reader.hasNext()) { + timeHeap.add(reader.currentTime()); + } + } + } + + @Override + public boolean hasNext() { + return !timeHeap.isEmpty(); + } + + @Override + public RowRecord next() throws IOException { + long minTime = timeHeapGet(); + + RowRecord record = new RowRecord(minTime); + + for (int i = 0; i < readers.size(); i++) { + BatchData reader = readers.get(i); + if (!reader.hasNext()) { + record.addField(new Field(null)); + } else { + if (reader.currentTime() == minTime) { + record.addField(getField(reader, dataTypes.get(i))); + reader.next(); + if (reader.hasNext()) { + timeHeap.add(reader.currentTime()); + } + } else { + record.addField(new Field(null)); + } + } + } + + return record; + } + + private Field getField(BatchData batchData, TSDataType dataType) { + Field field = new Field(dataType); + switch (dataType) { + case INT32: + field.setIntV(batchData.getInt()); + break; + case INT64: + field.setLongV(batchData.getLong()); + break; + case FLOAT: + field.setFloatV(batchData.getFloat()); + break; + case DOUBLE: + field.setDoubleV(batchData.getDouble()); + break; + case BOOLEAN: + field.setBoolV(batchData.getBoolean()); + break; + case TEXT: + field.setBinaryV(batchData.getBinary()); + break; + default: + throw new UnSupportedDataTypeException("UnSupported: " + dataType); + } + return field; + } + + private Long timeHeapGet() { + Long t = timeHeap.peek(); + while (!timeHeap.isEmpty()){ + if(timeHeap.peek() == t){ + timeHeap.poll(); + } + else { + break; + } + } + return t; + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java new file mode 100644 index 0000000..4cb269f --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.executor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.exception.FileNodeManagerException; +import org.apache.iotdb.db.exception.PathErrorException; +import org.apache.iotdb.db.exception.ProcessorException; +import org.apache.iotdb.db.metadata.MManager; +import org.apache.iotdb.db.query.aggregation.AggreFuncFactory; +import org.apache.iotdb.db.query.aggregation.impl.LastAggrFunc; +import org.apache.iotdb.db.query.aggregation.impl.MaxTimeAggrFunc; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.control.QueryDataSourceManager; +import org.apache.iotdb.db.query.control.QueryTokenManager; +import org.apache.iotdb.db.query.dataset.AggregateDataSet; +import org.apache.iotdb.db.query.factory.SeriesReaderFactory; +import org.apache.iotdb.db.query.reader.IAggregateReader; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader; +import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader; +import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.expression.IExpression; +import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; + +public class AggregateEngineExecutor { + private long jobId; + private List<Path> selectedSeries; + private List<String> aggres; + private IExpression expression; + + public AggregateEngineExecutor(long jobId, List<Path> selectedSeries, List<String> aggres, IExpression expression) { + this.jobId = jobId; + this.selectedSeries = selectedSeries; + this.aggres = aggres; + this.expression = expression; + } + + public AggregateDataSet executeWithOutTimeGenerator(QueryContext context) + throws FileNodeManagerException, IOException, PathErrorException, ProcessorException { + Filter timeFilter = null; + if(timeFilter!=null){ + timeFilter = ((GlobalTimeExpression)expression).getFilter(); + } + QueryTokenManager.getInstance().beginQueryOfGivenQueryPaths(jobId, selectedSeries); + + List<IAggregateReader> readersOfSequenceData = new ArrayList<>(); + List<IPointReader> readersOfUnSequenceData = new ArrayList<>(); + List<AggregateFunction> aggregateFunctions = new ArrayList<>(); + for(int i = 0; i < selectedSeries.size(); i++){ + //construct AggregateFunction + TSDataType tsDataType = MManager.getInstance().getSeriesType(selectedSeries.get(i).getFullPath()); + AggregateFunction function = AggreFuncFactory.getAggrFuncByName(aggres.get(i), tsDataType); + function.init(); + aggregateFunctions.add(function); + + QueryDataSource queryDataSource = QueryDataSourceManager.getQueryDataSource(jobId, selectedSeries.get(i), context); + // sequence reader for sealed tsfile, unsealed tsfile, memory + SequenceDataReader sequenceReader = new SequenceDataReader(queryDataSource.getSeqDataSource(), timeFilter, context); + // unseq reader for all chunk groups in unSeqFile, memory + PriorityMergeReader unSeqMergeReader = SeriesReaderFactory.getInstance().createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), timeFilter); + readersOfSequenceData.add(sequenceReader); + readersOfUnSequenceData.add(unSeqMergeReader); + } + + List<BatchData> batchDatas = new ArrayList<BatchData>(); + //TODO use multi-thread + for(int i = 0; i < selectedSeries.size(); i++){ + BatchData batchData = aggregateWithOutTimeGenerator(aggregateFunctions.get(i), readersOfSequenceData.get(i), readersOfUnSequenceData.get(i)); + batchDatas.add(batchData); + } + return constructDataSet(batchDatas); + } + + private BatchData aggregateWithOutTimeGenerator(AggregateFunction function, IAggregateReader sequenceReader, IPointReader unSequenceReader) + throws IOException, ProcessorException { + if (function instanceof MaxTimeAggrFunc || function instanceof LastAggrFunc){ + //TODO Optimization + return function.getResult(); + } + + while (sequenceReader.hasNext()){ + PageHeader pageHeader = sequenceReader.nextPageHeader(); + //judge if overlap with unsequence data + if(canUseHeader(pageHeader, unSequenceReader)){ + //cal by pageHeader + function.calculateValueFromPageHeader(pageHeader); + sequenceReader.skipPageData(); + } + else { + //cal by pageData + function.calculateValueFromPageData(sequenceReader.nextBatch(), unSequenceReader); + } + } + + //cal with unsequence data + if(unSequenceReader.hasNext()){ + function.calculateValueFromUnsequenceReader(unSequenceReader); + } + return function.getResult(); + } + + private boolean canUseHeader(PageHeader pageHeader, IPointReader unSequenceReader) + throws IOException { + //if page data is memory data. + if(pageHeader == null){ + return false; + } + + long minTime = pageHeader.getMinTimestamp(); + long maxTime = pageHeader.getMaxTimestamp(); + while(unSequenceReader.hasNext() && unSequenceReader.current().getTimestamp() <= maxTime){ + if(minTime <= unSequenceReader.current().getTimestamp()){ + return false; + } + unSequenceReader.next(); + } + return true; + } + + public AggregateDataSet executeWithTimeGenerator(QueryContext context){ + return null; + } + + private AggregateDataSet constructDataSet(List<BatchData> batchDataList) throws IOException { + List<TSDataType> dataTypes = new ArrayList<>(); + for(BatchData batchData : batchDataList){ + dataTypes.add(batchData.getDataType()); + } + return new AggregateDataSet(selectedSeries,aggres,dataTypes, batchDataList); + } + + +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java index c9327f4..b5a996f 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java @@ -16,18 +16,23 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.executor; import static org.apache.iotdb.tsfile.read.expression.ExpressionType.GLOBAL_TIME; import java.io.IOException; +import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.exception.PathErrorException; +import org.apache.iotdb.db.exception.ProcessorException; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.OpenedFilePathsManager; import org.apache.iotdb.db.query.control.QueryTokenManager; +import org.apache.iotdb.db.query.dataset.AggregateDataSet; import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException; +import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.expression.IExpression; import org.apache.iotdb.tsfile.read.expression.QueryExpression; import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer; @@ -88,6 +93,36 @@ public class EngineQueryRouter { } } + /** + * execute aggregation query. + */ + public AggregateDataSet aggregate(List<Path> selectedSeries, List<String> aggres, + IExpression expression) + throws QueryFilterOptimizationException, FileNodeManagerException, IOException, PathErrorException, ProcessorException { + + long nextJobId = getNextJobId(); + QueryTokenManager.getInstance().setJobIdForCurrentRequestThread(nextJobId); + OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(nextJobId); + + QueryContext context = new QueryContext(); + + if (expression != null) { + IExpression optimizedExpression = ExpressionOptimizer.getInstance() + .optimize(expression, selectedSeries); + AggregateEngineExecutor engineExecutor = new AggregateEngineExecutor(nextJobId, + selectedSeries, aggres, optimizedExpression); + if (optimizedExpression.getType() == GLOBAL_TIME) { + return engineExecutor.executeWithOutTimeGenerator(context); + } else { + return engineExecutor.executeWithTimeGenerator(context); + } + } else { + AggregateEngineExecutor engineExecutor = new AggregateEngineExecutor(nextJobId, + selectedSeries, aggres, expression); + return engineExecutor.executeWithOutTimeGenerator(context); + } + } + private synchronized long getNextJobId() { return jobIdGenerator.incrementAndGet(); } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java index ffb37e2..e853440 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java @@ -527,4 +527,62 @@ public class BatchData { public int length() { return this.timeLength; } + + public int getCurIdx() { + return curIdx; + } + + public long getTimeByIndex(int idx){ + rangeCheckForTime(idx); + return this.timeRet.get(idx / timeCapacity)[idx % timeCapacity]; + } + + public long getLongByIndex(int idx){ + rangeCheck(idx); + return this.longRet.get(idx / timeCapacity)[idx % timeCapacity]; + } + + public double getDoubleByIndex(int idx) { + rangeCheck(idx); + return this.doubleRet.get(idx / timeCapacity)[idx % timeCapacity]; + } + + public int getIntByIndex(int idx) { + rangeCheck(idx); + return this.intRet.get(idx / timeCapacity)[idx % timeCapacity]; + } + + public float getFloatByIndex(int idx) { + rangeCheck(idx); + return this.floatRet.get(idx / timeCapacity)[idx % timeCapacity]; + } + + public Binary getBinaryByIndex(int idx) { + rangeCheck(idx); + return binaryRet.get(idx / timeCapacity)[idx % timeCapacity]; + } + + public boolean getBooleanByIndex(int idx) { + rangeCheck(idx); + return booleanRet.get(idx / timeCapacity)[idx % timeCapacity]; + } + + public Object getValueByIndex(int idx) { + switch (dataType) { + case INT32: + return getIntByIndex(idx); + case INT64: + return getLongByIndex(idx); + case FLOAT: + return getFloatByIndex(idx); + case DOUBLE: + return getDoubleByIndex(idx); + case BOOLEAN: + return getBooleanByIndex(idx); + case TEXT: + return getBinaryByIndex(idx); + default: + return null; + } + } }
