This is an automated email from the ASF dual-hosted git repository. suyue pushed a commit to branch aggregate in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 5979aff5f3252af7ef4679a17ab7d8a41c446bf8 Author: suyue <[email protected]> AuthorDate: Sat Mar 9 20:35:38 2019 +0800 realize reader for aggregation --- .../dataset/EngineDataSetWithoutTimeGenerator.java | 12 +- .../executor/EngineExecutorWithTimeGenerator.java | 13 +- .../EngineExecutorWithoutTimeGenerator.java | 33 ++- .../db/query/factory/SeriesReaderFactory.java | 36 ++- .../iotdb/db/query/reader/AllDataReader.java | 125 +++++++++ .../reader/{IReader.java => IAggregateReader.java} | 28 +- .../reader/{IReader.java => IBatchReader.java} | 18 +- .../reader/{IReader.java => IPointReader.java} | 16 +- .../org/apache/iotdb/db/query/reader/IReader.java | 9 +- ...nkReaderWithFilter.java => MemChunkReader.java} | 58 ++++- .../reader/mem/MemChunkReaderByTimestamp.java | 22 +- .../reader/mem/MemChunkReaderWithoutFilter.java | 70 ----- .../db/query/reader/merge/PriorityMergeReader.java | 56 ++-- .../merge/PriorityMergeReaderByTimestamp.java | 1 + .../query/reader/sequence/SealedTsFilesReader.java | 108 +++----- .../query/reader/sequence/SequenceDataReader.java | 48 ++-- .../reader/sequence/UnSealedTsFileReader.java | 44 +--- .../query/reader/unsequence/EngineChunkReader.java | 39 +-- .../db/query/timegenerator/EngineLeafNode.java | 19 +- .../query/timegenerator/EngineNodeConstructor.java | 14 +- iotdb/src/test/java/PerformanceTest.java | 282 +++++++++++++++++++++ .../iotdb/db/query/reader/AllDataReaderTest.java | 77 ++++++ .../iotdb/db/query/reader/FakedIBatchPoint.java | 108 ++++++++ .../iotdb/db/query/reader/FakedIPointReader.java | 75 ++++++ .../query/reader/FakedSeriesReaderByTimestamp.java | 93 +++++++ .../merge/PriorityMergeReaderByTimestampTest.java | 27 +- .../reader/merge/PriorityMergeReaderTest.java | 23 +- .../reader/merge/SeriesMergeSortReaderTest.java | 22 +- .../reader/sequence/SealedTsFilesReaderTest.java} | 32 ++- .../tsfile/read/reader/chunk/ChunkReader.java | 54 ++-- .../read/reader/series/FileSeriesReader.java | 45 ++-- 31 files changed, 1081 insertions(+), 526 deletions(-) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutTimeGenerator.java index 4c0ccb7..6d78f74 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutTimeGenerator.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/EngineDataSetWithoutTimeGenerator.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.dataset; import java.io.IOException; @@ -23,6 +24,7 @@ import java.util.HashSet; import java.util.List; import java.util.PriorityQueue; import java.util.Set; +import org.apache.iotdb.db.query.reader.IPointReader; import org.apache.iotdb.db.query.reader.IReader; import org.apache.iotdb.db.utils.TimeValuePair; import org.apache.iotdb.db.utils.TsPrimitiveType; @@ -34,11 +36,11 @@ import org.apache.iotdb.tsfile.read.common.RowRecord; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; /** - * TODO implement this class as TsFile DataSetWithoutTimeGenerator + * TODO implement this class as TsFile DataSetWithoutTimeGenerator. */ public class EngineDataSetWithoutTimeGenerator extends QueryDataSet { - private List<IReader> readers; + private List<IPointReader> readers; private TimeValuePair[] cacheTimeValueList; @@ -55,7 +57,7 @@ public class EngineDataSetWithoutTimeGenerator extends QueryDataSet { * @throws IOException IOException */ public EngineDataSetWithoutTimeGenerator(List<Path> paths, List<TSDataType> dataTypes, - List<IReader> readers) + List<IPointReader> readers) throws IOException { super(paths, dataTypes); this.readers = readers; @@ -68,7 +70,7 @@ public class EngineDataSetWithoutTimeGenerator extends QueryDataSet { cacheTimeValueList = new TimeValuePair[readers.size()]; for (int i = 0; i < readers.size(); i++) { - IReader reader = readers.get(i); + IPointReader reader = readers.get(i); if (reader.hasNext()) { TimeValuePair timeValuePair = reader.next(); cacheTimeValueList[i] = timeValuePair; @@ -89,7 +91,7 @@ public class EngineDataSetWithoutTimeGenerator extends QueryDataSet { RowRecord record = new RowRecord(minTime); for (int i = 0; i < readers.size(); i++) { - IReader reader = readers.get(i); + IPointReader reader = readers.get(i); if (cacheTimeValueList[i] == null) { record.addField(new Field(null)); } else { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java index 13f0053..d5b1740 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.executor; import java.io.IOException; @@ -30,6 +31,7 @@ import org.apache.iotdb.db.query.control.QueryDataSourceManager; import org.apache.iotdb.db.query.control.QueryTokenManager; import org.apache.iotdb.db.query.dataset.EngineDataSetWithTimeGenerator; import org.apache.iotdb.db.query.factory.SeriesReaderFactory; +import org.apache.iotdb.db.query.reader.AllDataReader; import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader; import org.apache.iotdb.db.query.reader.merge.PriorityMergeReaderByTimestamp; @@ -108,12 +110,19 @@ public class EngineExecutorWithTimeGenerator { // reader for sequence data SequenceDataReader tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(), null, context); - mergeReaderByTimestamp.addReaderWithPriority(tsFilesReader, 1); // reader for unSequence data PriorityMergeReader unSeqMergeReader = SeriesReaderFactory.getInstance() .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), null); - mergeReaderByTimestamp.addReaderWithPriority(unSeqMergeReader, 2); + + if (tsFilesReader == null || !tsFilesReader.hasNext()) { + mergeReaderByTimestamp + .addReaderWithPriority(unSeqMergeReader, PriorityMergeReader.HIGH_PRIORITY); + } else { + mergeReaderByTimestamp + .addReaderWithPriority(new AllDataReader(tsFilesReader, unSeqMergeReader), + PriorityMergeReader.HIGH_PRIORITY); + } readersOfSelectedSeries.add(mergeReaderByTimestamp); } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java index 11c94bc..6eb300e 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.executor; import java.io.IOException; @@ -30,7 +31,8 @@ import org.apache.iotdb.db.query.control.QueryDataSourceManager; import org.apache.iotdb.db.query.control.QueryTokenManager; import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator; import org.apache.iotdb.db.query.factory.SeriesReaderFactory; -import org.apache.iotdb.db.query.reader.IReader; +import org.apache.iotdb.db.query.reader.AllDataReader; +import org.apache.iotdb.db.query.reader.IPointReader; import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader; import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -61,7 +63,7 @@ public class EngineExecutorWithoutTimeGenerator { Filter timeFilter = ((GlobalTimeExpression) queryExpression.getExpression()).getFilter(); - List<IReader> readersOfSelectedSeries = new ArrayList<>(); + List<IPointReader> readersOfSelectedSeries = new ArrayList<>(); List<TSDataType> dataTypes = new ArrayList<>(); QueryTokenManager.getInstance() @@ -79,14 +81,11 @@ public class EngineExecutorWithoutTimeGenerator { throw new FileNodeManagerException(e); } - PriorityMergeReader priorityReader = new PriorityMergeReader(); - // sequence reader for one sealed tsfile SequenceDataReader tsFilesReader; try { tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(), timeFilter, context); - priorityReader.addReaderWithPriority(tsFilesReader, PriorityMergeReader.LOW_PRIORITY); } catch (IOException e) { throw new FileNodeManagerException(e); } @@ -96,12 +95,18 @@ public class EngineExecutorWithoutTimeGenerator { try { unSeqMergeReader = SeriesReaderFactory.getInstance() .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), timeFilter); - priorityReader.addReaderWithPriority(unSeqMergeReader, PriorityMergeReader.HIGH_PRIORITY); } catch (IOException e) { throw new FileNodeManagerException(e); } - readersOfSelectedSeries.add(priorityReader); + if (tsFilesReader == null) { + //only have unsequence data. + readersOfSelectedSeries.add(unSeqMergeReader); + } else { + //merge sequence data with unsequence data. + readersOfSelectedSeries.add(new AllDataReader(tsFilesReader, unSeqMergeReader)); + } + } try { @@ -118,7 +123,7 @@ public class EngineExecutorWithoutTimeGenerator { public QueryDataSet executeWithoutFilter(QueryContext context) throws FileNodeManagerException { - List<IReader> readersOfSelectedSeries = new ArrayList<>(); + List<IPointReader> readersOfSelectedSeries = new ArrayList<>(); List<TSDataType> dataTypes = new ArrayList<>(); QueryTokenManager.getInstance() @@ -136,14 +141,11 @@ public class EngineExecutorWithoutTimeGenerator { throw new FileNodeManagerException(e); } - PriorityMergeReader priorityReader = new PriorityMergeReader(); - // sequence insert data SequenceDataReader tsFilesReader; try { tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(), null, context); - priorityReader.addReaderWithPriority(tsFilesReader, 1); } catch (IOException e) { throw new FileNodeManagerException(e); } @@ -153,12 +155,17 @@ public class EngineExecutorWithoutTimeGenerator { try { unSeqMergeReader = SeriesReaderFactory.getInstance() .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), null); - priorityReader.addReaderWithPriority(unSeqMergeReader, 2); } catch (IOException e) { throw new FileNodeManagerException(e); } - readersOfSelectedSeries.add(priorityReader); + if (tsFilesReader == null) { + //only have unsequence data. + readersOfSelectedSeries.add(unSeqMergeReader); + } else { + //merge sequence data with unsequence data. + readersOfSelectedSeries.add(new AllDataReader(tsFilesReader, unSeqMergeReader)); + } } try { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java index 51ce94b..e20c009 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java @@ -27,9 +27,11 @@ import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile; import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.FileReaderManager; +import org.apache.iotdb.db.query.reader.AllDataReader; +import org.apache.iotdb.db.query.reader.IBatchReader; +import org.apache.iotdb.db.query.reader.IPointReader; import org.apache.iotdb.db.query.reader.IReader; -import org.apache.iotdb.db.query.reader.mem.MemChunkReaderWithFilter; -import org.apache.iotdb.db.query.reader.mem.MemChunkReaderWithoutFilter; +import org.apache.iotdb.db.query.reader.mem.MemChunkReader; import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader; import org.apache.iotdb.db.query.reader.sequence.SealedTsFilesReader; import org.apache.iotdb.db.query.reader.unsequence.EngineChunkReader; @@ -110,15 +112,9 @@ public class SeriesReaderFactory { // add reader for MemTable if (overflowSeriesDataSource.hasRawChunk()) { - if (filter != null) { - unSeqMergeReader.addReaderWithPriority( - new MemChunkReaderWithFilter(overflowSeriesDataSource.getReadableMemChunk(), filter), - priorityValue); - } else { - unSeqMergeReader.addReaderWithPriority( - new MemChunkReaderWithoutFilter(overflowSeriesDataSource.getReadableMemChunk()), - priorityValue); - } + unSeqMergeReader.addReaderWithPriority( + new MemChunkReader(overflowSeriesDataSource.getReadableMemChunk(), filter), + priorityValue); } // TODO add external sort when needed @@ -141,22 +137,24 @@ public class SeriesReaderFactory { singleSeriesExpression, intervalFileNode.getFilePath()); - PriorityMergeReader priorityMergeReader = new PriorityMergeReader(); - // Sequence reader - IReader seriesInTsFileReader = createSealedTsFileReaderForMerge(intervalFileNode, + IBatchReader seriesInTsFileReader = createSealedTsFileReaderForMerge(intervalFileNode, singleSeriesExpression, context); - priorityMergeReader.addReaderWithPriority(seriesInTsFileReader, 1); // UnSequence merge reader - IReader unSeqMergeReader = createUnSeqMergeReader(overflowSeriesDataSource, + IPointReader unSeqMergeReader = createUnSeqMergeReader(overflowSeriesDataSource, singleSeriesExpression.getFilter()); - priorityMergeReader.addReaderWithPriority(unSeqMergeReader, 2); + if (seriesInTsFileReader == null || !seriesInTsFileReader.hasNext()) { + //only have unsequence data. + return unSeqMergeReader; + } else { + //merge sequence data with unsequence data. + return new AllDataReader(seriesInTsFileReader, unSeqMergeReader); + } - return priorityMergeReader; } - private IReader createSealedTsFileReaderForMerge(IntervalFileNode fileNode, + private IBatchReader createSealedTsFileReaderForMerge(IntervalFileNode fileNode, SingleSeriesExpression singleSeriesExpression, QueryContext context) throws IOException { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/AllDataReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/AllDataReader.java new file mode 100644 index 0000000..d277c45 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/AllDataReader.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.reader; + +import java.io.IOException; +import org.apache.iotdb.db.utils.TimeValuePair; +import org.apache.iotdb.db.utils.TimeValuePairUtils; +import org.apache.iotdb.tsfile.read.common.BatchData; + +public class AllDataReader implements IPointReader { + + private IBatchReader batchReader; + private IPointReader pointReader; + + private boolean hasCachedBatchData; + private BatchData batchData; + + /** + * merge sequence reader, unsequence reader. + */ + public AllDataReader(IBatchReader batchReader, IPointReader pointReader) { + this.batchReader = batchReader; + this.pointReader = pointReader; + + this.hasCachedBatchData = false; + } + + @Override + public boolean hasNext() throws IOException { + //has value in batchData + if (hasCachedBatchData && batchData.hasNext()) { + return true; + } else { + hasCachedBatchData = false; + } + + //has value in batchReader + while (batchReader.hasNext()) { + batchData = batchReader.nextBatch(); + if (batchData.hasNext()) { + hasCachedBatchData = true; + return true; + } + } + + //has value in pointData + if (pointReader != null && pointReader.hasNext()) { + return true; + } + + return false; + } + + @Override + public TimeValuePair next() throws IOException { + //construct batchData, and compare with value in pointReader + while (hasCachedBatchData || batchReader.hasNext()) { + //if batchData isn't initialization, then initialize it + if (!hasCachedBatchData) { + if (batchReader.hasNext()) { + batchData = batchReader.nextBatch(); + hasCachedBatchData = true; + } else { + hasCachedBatchData = false; + break; + } + } + + //if batchData is end, then jump to the entry of the while loop. + if (!batchData.hasNext()) { + hasCachedBatchData = false; + continue; + } + + //pointReader has next, compare value in pointReader with value in batchData. + if (pointReader != null && pointReader.hasNext()) { + long timeInPointReader = pointReader.current().getTimestamp(); + long timeInBatchData = batchData.currentTime(); + if (timeInPointReader > timeInBatchData) { + TimeValuePair timeValuePair = TimeValuePairUtils.getCurrentTimeValuePair(batchData); + batchData.next(); + return timeValuePair; + } + if (timeInPointReader == timeInBatchData) { + batchData.next(); + } + return pointReader.next(); + } else { + // pointReader doesn't have next time-value pair, return pair in batchData. + TimeValuePair timeValuePair = TimeValuePairUtils.getCurrentTimeValuePair(batchData); + batchData.next(); + return timeValuePair; + } + } + return null; + } + + @Override + public TimeValuePair current() throws IOException { + throw new IOException("current() in AllDataReader is an empty method."); + } + + @Override + public void close() throws IOException { + batchReader.close(); + pointReader.close(); + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/IReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/IAggregateReader.java similarity index 62% copy from iotdb/src/main/java/org/apache/iotdb/db/query/reader/IReader.java copy to iotdb/src/main/java/org/apache/iotdb/db/query/reader/IAggregateReader.java index 6a574d6..edc985f 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/IReader.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/IAggregateReader.java @@ -16,29 +16,19 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader; import java.io.IOException; -import org.apache.iotdb.db.utils.TimeValuePair; -import org.apache.iotdb.tsfile.read.common.BatchData; - -/** - * Vital read interface. Batch method is used to increase query speed. Getting a batch of data - * a time is faster than getting one point a time. - */ -public interface IReader { - - boolean hasNext() throws IOException; - - TimeValuePair next() throws IOException; - - void skipCurrentTimeValuePair() throws IOException; - - void close() throws IOException; +import org.apache.iotdb.tsfile.file.header.PageHeader; - boolean hasNextBatch(); +public interface IAggregateReader extends IBatchReader { - BatchData nextBatch(); + /** + * Returns meta-information of batch data. If batch data comes from memory, return null. If batch + * data comes from page data, return pageHeader. + */ + PageHeader nextPageHeader() throws IOException; - BatchData currentBatch(); + void skipPageData() throws IOException; } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/IReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/IBatchReader.java similarity index 71% copy from iotdb/src/main/java/org/apache/iotdb/db/query/reader/IReader.java copy to iotdb/src/main/java/org/apache/iotdb/db/query/reader/IBatchReader.java index 6a574d6..6ca07c2 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/IReader.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/IBatchReader.java @@ -16,29 +16,17 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader; import java.io.IOException; -import org.apache.iotdb.db.utils.TimeValuePair; import org.apache.iotdb.tsfile.read.common.BatchData; -/** - * Vital read interface. Batch method is used to increase query speed. Getting a batch of data - * a time is faster than getting one point a time. - */ -public interface IReader { +public interface IBatchReader { boolean hasNext() throws IOException; - TimeValuePair next() throws IOException; - - void skipCurrentTimeValuePair() throws IOException; + BatchData nextBatch() throws IOException; void close() throws IOException; - - boolean hasNextBatch(); - - BatchData nextBatch(); - - BatchData currentBatch(); } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/IReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/IPointReader.java similarity index 74% copy from iotdb/src/main/java/org/apache/iotdb/db/query/reader/IReader.java copy to iotdb/src/main/java/org/apache/iotdb/db/query/reader/IPointReader.java index 6a574d6..cbd7e3c 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/IReader.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/IPointReader.java @@ -16,29 +16,19 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader; import java.io.IOException; import org.apache.iotdb.db.utils.TimeValuePair; -import org.apache.iotdb.tsfile.read.common.BatchData; -/** - * Vital read interface. Batch method is used to increase query speed. Getting a batch of data - * a time is faster than getting one point a time. - */ -public interface IReader { +public interface IPointReader extends IReader { boolean hasNext() throws IOException; TimeValuePair next() throws IOException; - void skipCurrentTimeValuePair() throws IOException; + TimeValuePair current() throws IOException; void close() throws IOException; - - boolean hasNextBatch(); - - BatchData nextBatch(); - - BatchData currentBatch(); } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/IReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/IReader.java index 6a574d6..bdb8d32 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/IReader.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/IReader.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader; import java.io.IOException; @@ -32,13 +33,5 @@ public interface IReader { TimeValuePair next() throws IOException; - void skipCurrentTimeValuePair() throws IOException; - void close() throws IOException; - - boolean hasNextBatch(); - - BatchData nextBatch(); - - BatchData currentBatch(); } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/mem/MemChunkReaderWithFilter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/mem/MemChunkReader.java similarity index 54% rename from iotdb/src/main/java/org/apache/iotdb/db/query/reader/mem/MemChunkReaderWithFilter.java rename to iotdb/src/main/java/org/apache/iotdb/db/query/reader/mem/MemChunkReader.java index 1f21a84..fc2bbe8 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/mem/MemChunkReaderWithFilter.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/mem/MemChunkReader.java @@ -16,26 +16,37 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader.mem; +import java.io.IOException; import java.util.Iterator; -import org.apache.iotdb.db.engine.memtable.TimeValuePairSorter; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; -import org.apache.iotdb.db.query.reader.IReader; +import org.apache.iotdb.db.query.reader.IAggregateReader; +import org.apache.iotdb.db.query.reader.IBatchReader; +import org.apache.iotdb.db.query.reader.IPointReader; import org.apache.iotdb.db.utils.TimeValuePair; +import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.read.filter.basic.Filter; -public class MemChunkReaderWithFilter implements IReader { +public class MemChunkReader implements IPointReader, IBatchReader, IAggregateReader { private Iterator<TimeValuePair> timeValuePairIterator; private Filter filter; private boolean hasCachedTimeValuePair; private TimeValuePair cachedTimeValuePair; - public MemChunkReaderWithFilter(ReadOnlyMemChunk readableChunk, Filter filter) { + private TSDataType dataType; + + /** + * memory data reader. + */ + public MemChunkReader(ReadOnlyMemChunk readableChunk, Filter filter) { timeValuePairIterator = readableChunk.getIterator(); this.filter = filter; + this.dataType = readableChunk.getDataType(); } @Override @@ -45,7 +56,8 @@ public class MemChunkReaderWithFilter implements IReader { } while (timeValuePairIterator.hasNext()) { TimeValuePair timeValuePair = timeValuePairIterator.next(); - if (filter.satisfy(timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) { + if (filter == null || filter + .satisfy(timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) { hasCachedTimeValuePair = true; cachedTimeValuePair = timeValuePair; break; @@ -65,27 +77,45 @@ public class MemChunkReaderWithFilter implements IReader { } @Override - public void skipCurrentTimeValuePair() { - next(); + public TimeValuePair current() throws IOException { + if (!hasCachedTimeValuePair) { + cachedTimeValuePair = timeValuePairIterator.next(); + hasCachedTimeValuePair = true; + } + return cachedTimeValuePair; } @Override - public void close() { - // Do nothing because mem chunk reader will not open files + public BatchData nextBatch() throws IOException { + BatchData batchData = new BatchData(dataType, true); + if (hasCachedTimeValuePair) { + hasCachedTimeValuePair = false; + batchData.putTime(cachedTimeValuePair.getTimestamp()); + batchData.putAnObject(cachedTimeValuePair.getValue().getValue()); + } + while (timeValuePairIterator.hasNext()) { + TimeValuePair timeValuePair = timeValuePairIterator.next(); + if (filter == null || filter + .satisfy(timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) { + batchData.putTime(timeValuePair.getTimestamp()); + batchData.putAnObject(timeValuePair.getValue().getValue()); + } + } + return batchData; } @Override - public boolean hasNextBatch() { - return false; + public void close() { + // Do nothing because mem chunk reader will not open files } @Override - public BatchData nextBatch() { + public PageHeader nextPageHeader() throws IOException { return null; } @Override - public BatchData currentBatch() { - return null; + public void skipPageData() throws IOException { + nextBatch(); } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/mem/MemChunkReaderByTimestamp.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/mem/MemChunkReaderByTimestamp.java index dbd6beb..2db7e0b 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/mem/MemChunkReaderByTimestamp.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/mem/MemChunkReaderByTimestamp.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader.mem; import java.io.IOException; @@ -24,7 +25,6 @@ import org.apache.iotdb.db.engine.memtable.TimeValuePairSorter; import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; import org.apache.iotdb.db.utils.TimeValuePair; import org.apache.iotdb.db.utils.TsPrimitiveType; -import org.apache.iotdb.tsfile.read.common.BatchData; public class MemChunkReaderByTimestamp implements EngineReaderByTimeStamp { @@ -55,11 +55,6 @@ public class MemChunkReaderByTimestamp implements EngineReaderByTimeStamp { } @Override - public void skipCurrentTimeValuePair() throws IOException { - next(); - } - - @Override public void close() { // Do nothing because mem chunk reader will not open files } @@ -82,19 +77,4 @@ public class MemChunkReaderByTimestamp implements EngineReaderByTimeStamp { return null; } - @Override - public boolean hasNextBatch() { - return false; - } - - @Override - public BatchData nextBatch() { - return null; - } - - @Override - public BatchData currentBatch() { - return null; - } - } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/mem/MemChunkReaderWithoutFilter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/mem/MemChunkReaderWithoutFilter.java deleted file mode 100644 index bae7496..0000000 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/mem/MemChunkReaderWithoutFilter.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.query.reader.mem; - -import java.util.Iterator; -import org.apache.iotdb.db.engine.memtable.TimeValuePairSorter; -import org.apache.iotdb.db.query.reader.IReader; -import org.apache.iotdb.db.utils.TimeValuePair; -import org.apache.iotdb.tsfile.read.common.BatchData; - -// TODO merge MemChunkReaderWithoutFilter and MemChunkReaderWithFilter to one class -public class MemChunkReaderWithoutFilter implements IReader { - - private Iterator<TimeValuePair> timeValuePairIterator; - - public MemChunkReaderWithoutFilter(TimeValuePairSorter readableChunk) { - timeValuePairIterator = readableChunk.getIterator(); - } - - @Override - public boolean hasNext() { - return timeValuePairIterator.hasNext(); - } - - @Override - public TimeValuePair next() { - return timeValuePairIterator.next(); - } - - @Override - public void skipCurrentTimeValuePair() { - next(); - } - - @Override - public void close() { - // Do nothing because mem chunk reader will not open files - } - - @Override - public boolean hasNextBatch() { - return false; - } - - @Override - public BatchData nextBatch() { - return null; - } - - @Override - public BatchData currentBatch() { - return null; - } -} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReader.java index e100e50..7a3c665 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReader.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReader.java @@ -16,36 +16,34 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader.merge; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.PriorityQueue; -import org.apache.iotdb.db.query.reader.IReader; +import org.apache.iotdb.db.query.reader.IPointReader; import org.apache.iotdb.db.utils.TimeValuePair; -import org.apache.iotdb.tsfile.read.common.BatchData; /** * <p> - * Usage: - * (1) merge multiple chunk group readers in the unsequence file - * (2)merge sequence reader, unsequence reader and mem reader + * Usage: (1) merge multiple chunk group readers in the unsequence file. * </p> */ -public class PriorityMergeReader implements IReader { +public class PriorityMergeReader implements IPointReader { public static final int LOW_PRIORITY = 1; public static final int HIGH_PRIORITY = 2; - private List<IReader> readerList = new ArrayList<>(); + private List<IPointReader> readerList = new ArrayList<>(); private List<Integer> priorityList = new ArrayList<>(); private PriorityQueue<Element> heap = new PriorityQueue<>(); /** - * The bigger the priority value is, the higher the priority of this reader is + * The bigger the priority value is, the higher the priority of this reader is. */ - public void addReaderWithPriority(IReader reader, int priority) throws IOException { + public void addReaderWithPriority(IPointReader reader, int priority) throws IOException { if (reader.hasNext()) { heap.add(new Element(readerList.size(), reader.next(), priority)); } @@ -65,11 +63,16 @@ public class PriorityMergeReader implements IReader { return top.timeValuePair; } + @Override + public TimeValuePair current() throws IOException { + return heap.peek().timeValuePair; + } + private void updateHeap(Element top) throws IOException { while (!heap.isEmpty() && heap.peek().timeValuePair.getTimestamp() == top.timeValuePair .getTimestamp()) { Element e = heap.poll(); - IReader reader = readerList.get(e.index); + IPointReader reader = readerList.get(e.index); if (reader.hasNext()) { heap.add(new Element(e.index, reader.next(), priorityList.get(e.index))); } @@ -77,35 +80,12 @@ public class PriorityMergeReader implements IReader { } @Override - public void skipCurrentTimeValuePair() throws IOException { - if (hasNext()) { - next(); - } - } - - @Override public void close() throws IOException { - for (IReader reader : readerList) { + for (IPointReader reader : readerList) { reader.close(); } } - @Override - public boolean hasNextBatch() { - // TODO - return false; - } - - @Override - public BatchData nextBatch() { - return null; - } - - @Override - public BatchData currentBatch() { - return null; - } - protected class Element implements Comparable<Element> { int index; @@ -133,11 +113,11 @@ public class PriorityMergeReader implements IReader { } @Override - public boolean equals(Object o){ - if (o instanceof Element){ + public boolean equals(Object o) { + if (o instanceof Element) { Element element = (Element) o; if (this.timeValuePair.getTimestamp() == element.timeValuePair.getTimestamp() - && this.priority.equals(element.priority)){ + && this.priority.equals(element.priority)) { return true; } } @@ -145,7 +125,7 @@ public class PriorityMergeReader implements IReader { } @Override - public int hashCode(){ + public int hashCode() { return (int) (timeValuePair.getTimestamp() * 31 + priority.hashCode()); } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReaderByTimestamp.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReaderByTimestamp.java index 244291a..b606665 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReaderByTimestamp.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReaderByTimestamp.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader.merge; import java.io.IOException; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java index a8465bf..b2a0994 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java @@ -26,10 +26,10 @@ import org.apache.iotdb.db.engine.filenode.IntervalFileNode; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.FileReaderManager; -import org.apache.iotdb.db.query.reader.IReader; +import org.apache.iotdb.db.query.reader.IAggregateReader; +import org.apache.iotdb.db.query.reader.IBatchReader; import org.apache.iotdb.db.utils.QueryUtils; -import org.apache.iotdb.db.utils.TimeValuePair; -import org.apache.iotdb.db.utils.TimeValuePairUtils; +import org.apache.iotdb.tsfile.file.header.PageHeader; import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.read.common.BatchData; @@ -42,17 +42,18 @@ import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader; import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderWithFilter; import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderWithoutFilter; -public class SealedTsFilesReader implements IReader { +public class SealedTsFilesReader implements IBatchReader, IAggregateReader { private Path seriesPath; private List<IntervalFileNode> sealedTsFiles; private int usedIntervalFileIndex; private FileSeriesReader seriesReader; private Filter filter; - private BatchData data; - private boolean hasCachedData; private QueryContext context; + /** + * init with seriesPath, sealedTsFiles, filter, context. + */ public SealedTsFilesReader(Path seriesPath, List<IntervalFileNode> sealedTsFiles, Filter filter, QueryContext context) { this(seriesPath, sealedTsFiles, context); @@ -69,10 +70,12 @@ public class SealedTsFilesReader implements IReader { this.sealedTsFiles = sealedTsFiles; this.usedIntervalFileIndex = 0; this.seriesReader = null; - this.hasCachedData = false; this.context = context; } + /** + * init with seriesReader and queryContext. + */ public SealedTsFilesReader(FileSeriesReader seriesReader, QueryContext context) { this.seriesReader = seriesReader; sealedTsFiles = new ArrayList<>(); @@ -81,80 +84,30 @@ public class SealedTsFilesReader implements IReader { @Override public boolean hasNext() throws IOException { - if (hasCachedData) { + + // try to get next batch data from current reader + if (seriesReader != null && seriesReader.hasNextBatch()) { return true; } - while (!hasCachedData) { - boolean flag = false; - - // try to get next time value pair from current batch data - if (data != null && data.hasNext()) { - hasCachedData = true; - return true; - } - - // try to get next batch data from current reader - if (seriesReader != null && seriesReader.hasNextBatch()) { - data = seriesReader.nextBatch(); - if (data.hasNext()) { - hasCachedData = true; - return true; - } else { - flag = true; - } - } - + // init until reach a satisfied reader + while (usedIntervalFileIndex < sealedTsFiles.size()) { // try to get next batch data from next reader - while (!flag && usedIntervalFileIndex < sealedTsFiles.size()) { - // init until reach a satisfied reader - if (seriesReader == null || !seriesReader.hasNextBatch()) { - IntervalFileNode fileNode = sealedTsFiles.get(usedIntervalFileIndex++); - if (singleTsFileSatisfied(fileNode)) { - initSingleTsFileReader(fileNode, context); - } else { - flag = true; - } - } - if (!flag && seriesReader.hasNextBatch()) { - data = seriesReader.nextBatch(); - - // notice that, data maybe an empty batch data, so an examination must exist - if (data.hasNext()) { - hasCachedData = true; - return true; - } - } + IntervalFileNode fileNode = sealedTsFiles.get(usedIntervalFileIndex++); + if (singleTsFileSatisfied(fileNode)) { + initSingleTsFileReader(fileNode, context); + } else { + continue; } - if (!flag || data == null || !data.hasNext()) { - break; + if (seriesReader.hasNextBatch()) { + return true; } } return false; } - @Override - public TimeValuePair next() { - TimeValuePair timeValuePair = TimeValuePairUtils.getCurrentTimeValuePair(data); - data.next(); - hasCachedData = false; - return timeValuePair; - } - - @Override - public void skipCurrentTimeValuePair() { - next(); - } - - @Override - public void close() throws IOException { - if (seriesReader != null) { - seriesReader.close(); - } - } - private boolean singleTsFileSatisfied(IntervalFileNode fileNode) { if (filter == null) { @@ -192,17 +145,24 @@ public class SealedTsFilesReader implements IReader { } @Override - public boolean hasNextBatch() { - return false; + public BatchData nextBatch() throws IOException { + return seriesReader.nextBatch(); } @Override - public BatchData nextBatch() { - return null; + public void close() throws IOException { + if (seriesReader != null) { + seriesReader.close(); + } } @Override - public BatchData currentBatch() { + public PageHeader nextPageHeader() throws IOException { return null; } + + @Override + public void skipPageData() throws IOException { + seriesReader.skipPageData(); + } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java index e4ca98f..b664c93 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader.sequence; import java.io.IOException; @@ -23,10 +24,10 @@ import java.util.ArrayList; import java.util.List; import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource; import org.apache.iotdb.db.query.context.QueryContext; -import org.apache.iotdb.db.query.reader.IReader; -import org.apache.iotdb.db.query.reader.mem.MemChunkReaderWithFilter; -import org.apache.iotdb.db.query.reader.mem.MemChunkReaderWithoutFilter; -import org.apache.iotdb.db.utils.TimeValuePair; +import org.apache.iotdb.db.query.reader.IAggregateReader; +import org.apache.iotdb.db.query.reader.IBatchReader; +import org.apache.iotdb.db.query.reader.mem.MemChunkReader; +import org.apache.iotdb.tsfile.file.header.PageHeader; import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.read.filter.basic.Filter; @@ -36,12 +37,12 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter; * in MemTable. * </p> */ -public class SequenceDataReader implements IReader { +public class SequenceDataReader implements IBatchReader, IAggregateReader { - private List<IReader> seriesReaders; + private List<IAggregateReader> seriesReaders; private boolean curReaderInitialized; private int nextSeriesReaderIndex; - private IReader currentSeriesReader; + private IAggregateReader currentSeriesReader; /** * init with globalSortedSeriesDataSource and filter. @@ -68,17 +69,13 @@ public class SequenceDataReader implements IReader { // add data in memTable if (sources.hasRawSeriesChunk()) { - if (filter == null) { - seriesReaders.add(new MemChunkReaderWithoutFilter(sources.getReadableChunk())); - } else { - seriesReaders.add(new MemChunkReaderWithFilter(sources.getReadableChunk(), filter)); - } + seriesReaders.add(new MemChunkReader(sources.getReadableChunk(), filter)); } - } @Override public boolean hasNext() throws IOException { + if (curReaderInitialized && currentSeriesReader.hasNext()) { return true; } else { @@ -96,35 +93,24 @@ public class SequenceDataReader implements IReader { } @Override - public TimeValuePair next() throws IOException { - return currentSeriesReader.next(); - } - - @Override - public void skipCurrentTimeValuePair() throws IOException { - next(); - } - - @Override public void close() throws IOException { - for (IReader seriesReader : seriesReaders) { + for (IBatchReader seriesReader : seriesReaders) { seriesReader.close(); } } @Override - public boolean hasNextBatch() { - return false; + public BatchData nextBatch() throws IOException { + return currentSeriesReader.nextBatch(); } @Override - public BatchData nextBatch() { - return null; + public PageHeader nextPageHeader() throws IOException { + return currentSeriesReader.nextPageHeader(); } @Override - public BatchData currentBatch() { - return null; + public void skipPageData() throws IOException { + currentSeriesReader.skipPageData(); } - } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFileReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFileReader.java index 3c02930..c11ddc2 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFileReader.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/UnSealedTsFileReader.java @@ -16,14 +16,15 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader.sequence; import java.io.IOException; import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile; import org.apache.iotdb.db.query.control.FileReaderManager; -import org.apache.iotdb.db.query.reader.IReader; -import org.apache.iotdb.db.utils.TimeValuePair; -import org.apache.iotdb.db.utils.TimeValuePairUtils; +import org.apache.iotdb.db.query.reader.IAggregateReader; +import org.apache.iotdb.db.query.reader.IBatchReader; +import org.apache.iotdb.tsfile.file.header.PageHeader; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.read.common.Path; @@ -34,11 +35,10 @@ import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader; import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderWithFilter; import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReaderWithoutFilter; -public class UnSealedTsFileReader implements IReader { +public class UnSealedTsFileReader implements IBatchReader, IAggregateReader { protected Path seriesPath; private FileSeriesReader unSealedReader; - private BatchData data; /** * Construct funtion for UnSealedTsFileReader. @@ -61,31 +61,11 @@ public class UnSealedTsFileReader implements IReader { unsealedTsFile.getChunkMetaDataList(), filter); } - } @Override public boolean hasNext() throws IOException { - if (data == null || !data.hasNext()) { - if (!unSealedReader.hasNextBatch()) { - return false; - } - data = unSealedReader.nextBatch(); - } - - return data.hasNext(); - } - - @Override - public TimeValuePair next() { - TimeValuePair timeValuePair = TimeValuePairUtils.getCurrentTimeValuePair(data); - data.next(); - return timeValuePair; - } - - @Override - public void skipCurrentTimeValuePair() { - data.next(); + return unSealedReader.hasNextBatch(); } @Override @@ -96,17 +76,17 @@ public class UnSealedTsFileReader implements IReader { } @Override - public boolean hasNextBatch() { - return false; + public BatchData nextBatch() throws IOException { + return unSealedReader.nextBatch(); } @Override - public BatchData nextBatch() { - return null; + public PageHeader nextPageHeader() throws IOException { + return unSealedReader.nextPageHeader(); } @Override - public BatchData currentBatch() { - return null; + public void skipPageData() throws IOException { + unSealedReader.skipPageData(); } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/unsequence/EngineChunkReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/unsequence/EngineChunkReader.java index e49e968..66f03d2 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/unsequence/EngineChunkReader.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/unsequence/EngineChunkReader.java @@ -16,17 +16,18 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.reader.unsequence; import java.io.IOException; -import org.apache.iotdb.db.query.reader.IReader; +import org.apache.iotdb.db.query.reader.IPointReader; import org.apache.iotdb.db.utils.TimeValuePair; import org.apache.iotdb.db.utils.TimeValuePairUtils; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; -public class EngineChunkReader implements IReader { +public class EngineChunkReader implements IPointReader { private ChunkReader chunkReader; private BatchData data; @@ -44,15 +45,16 @@ public class EngineChunkReader implements IReader { @Override public boolean hasNext() throws IOException { - if (data == null || !data.hasNext()) { - if (chunkReader.hasNextBatch()) { - data = chunkReader.nextBatch(); - } else { - return false; + if (data != null && data.hasNext()) { + return true; + } + while (chunkReader.hasNextBatch()) { + data = chunkReader.nextBatch(); + if (data.hasNext()) { + return true; } } - - return data.hasNext(); + return false; } @Override @@ -63,8 +65,8 @@ public class EngineChunkReader implements IReader { } @Override - public void skipCurrentTimeValuePair() { - next(); + public TimeValuePair current() throws IOException { + return TimeValuePairUtils.getCurrentTimeValuePair(data); } @Override @@ -72,19 +74,4 @@ public class EngineChunkReader implements IReader { this.chunkReader.close(); this.unClosedTsFileReader.close(); } - - @Override - public boolean hasNextBatch() { - return false; - } - - @Override - public BatchData nextBatch() { - return null; - } - - @Override - public BatchData currentBatch() { - return null; - } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineLeafNode.java b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineLeafNode.java index 292ca0e..688e296 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineLeafNode.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineLeafNode.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.timegenerator; import java.io.IOException; @@ -44,15 +45,15 @@ public class EngineLeafNode implements Node { return reader.next().getTimestamp(); } - /** - * check if current time of current batch is equals to input time. - */ - public boolean currentTimeIs(long time) { - if (!reader.currentBatch().hasNext()) { - return false; - } - return reader.currentBatch().currentTime() == time; - } +// /** +// * check if current time of current batch is equals to input time. +// */ +// public boolean currentTimeIs(long time) { +// if (!reader.currentBatch().hasNext()) { +// return false; +// } +// return reader.currentBatch().currentTime() == time; +// } /** * check if current value is equals to input value. diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java index 0cb02c8..3d5b3e4 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.query.timegenerator; import static org.apache.iotdb.tsfile.read.expression.ExpressionType.AND; @@ -28,6 +29,7 @@ import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryDataSourceManager; import org.apache.iotdb.db.query.factory.SeriesReaderFactory; +import org.apache.iotdb.db.query.reader.AllDataReader; import org.apache.iotdb.db.query.reader.IReader; import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader; import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader; @@ -92,19 +94,21 @@ public class EngineNodeConstructor { Filter filter = singleSeriesExpression.getFilter(); - PriorityMergeReader priorityReader = new PriorityMergeReader(); - // reader for all sequence data SequenceDataReader tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(), filter, context); - priorityReader.addReaderWithPriority(tsFilesReader, 1); // reader for all unSequence data PriorityMergeReader unSeqMergeReader = SeriesReaderFactory.getInstance() .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), filter); - priorityReader.addReaderWithPriority(unSeqMergeReader, 2); - return priorityReader; + if (tsFilesReader == null || !tsFilesReader.hasNext()) { + //only have unsequence data. + return unSeqMergeReader; + } else { + //merge sequence data with unsequence data. + return new AllDataReader(tsFilesReader, unSeqMergeReader); + } } } diff --git a/iotdb/src/test/java/PerformanceTest.java b/iotdb/src/test/java/PerformanceTest.java new file mode 100644 index 0000000..8bcb228 --- /dev/null +++ b/iotdb/src/test/java/PerformanceTest.java @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import org.apache.iotdb.db.exception.FileNodeManagerException; +import org.apache.iotdb.db.query.executor.EngineQueryRouter; +import org.apache.iotdb.jdbc.Config; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.read.expression.IExpression; +import org.apache.iotdb.tsfile.read.expression.QueryExpression; +import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression; +import org.apache.iotdb.tsfile.read.filter.TimeFilter; +import org.apache.iotdb.tsfile.read.filter.ValueFilter; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory; +import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; + +/** + * Delete this class when submitting pr. + */ +public class PerformanceTest { + + // private static int deviceStart = 5, deviceEnd = 9; + private static int deviceStart = 9, deviceEnd = 9; + private static int sensorStart = 8, sensorEnd = 8; + private static String insertTemplate = "INSERT INTO root.perform.group_0.d_%s(timestamp,s_%s" + + ") VALUES(%d,%d)"; + + public static void main(String[] args) throws IOException, FileNodeManagerException, SQLException { + try { + Class.forName("org.apache.iotdb.jdbc.IoTDBDriver"); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } + //insert(); + + //singleWithoutFilterTest(); + + // queryMultiSeriesWithoutFilterTest(); + + queryMultiSeriesWithFilterTest(); + } + + private static void singleWithoutFilterTest() throws IOException, FileNodeManagerException { + + List<Path> selectedPathList = new ArrayList<>(); + selectedPathList.add(getPath(1, 1)); + + QueryExpression queryExpression = QueryExpression.create(selectedPathList, null); + + EngineQueryRouter queryRouter = new EngineQueryRouter(); + + long startTime = System.currentTimeMillis(); + + QueryDataSet queryDataSet = queryRouter.query(queryExpression); + + int count = 0; + while (queryDataSet.hasNext()) { + RowRecord rowRecord = queryDataSet.next(); + count++; + // output(count, rowRecord, true); + } + + long endTime = System.currentTimeMillis(); + System.out + .println(String.format("Time consume : %s, count number : %s", endTime - startTime, count)); + + } + + public static void queryMultiSeriesWithoutFilterTest() + throws IOException, FileNodeManagerException { + + List<Path> selectedPathList = new ArrayList<>(); + for (int i = deviceStart; i <= deviceEnd; i++) { + for (int j = sensorStart; j <= sensorEnd; j++) { + selectedPathList.add(getPath(i, j)); + } + } + + QueryExpression queryExpression = QueryExpression.create(selectedPathList, null); + + EngineQueryRouter queryRouter = new EngineQueryRouter(); + + long startTime = System.currentTimeMillis(); + + QueryDataSet queryDataSet = queryRouter.query(queryExpression); + + int count = 0; + while (queryDataSet.hasNext()) { + RowRecord rowRecord = queryDataSet.next(); + count++; + } + + long endTime = System.currentTimeMillis(); + System.out + .println(String.format("Time consume : %s, count number : %s", endTime - startTime, count)); + + } + + public static void queryMultiSeriesWithFilterTest() throws IOException, FileNodeManagerException { + + List<Path> selectedPathList = new ArrayList<>(); + for (int i = deviceStart; i <= deviceEnd; i++) { + for (int j = sensorStart; j <= sensorEnd; j++) { + selectedPathList.add(getPath(i, j)); + } + } + + Filter valueFilter = ValueFilter.gtEq(34300); + Filter timeFilter = FilterFactory + .and(TimeFilter.gtEq(50000L), TimeFilter.ltEq(100000L)); + Filter filter = FilterFactory.and(timeFilter, valueFilter); + + IExpression expression = new SingleSeriesExpression(getPath(9, 9), filter); + EngineQueryRouter queryRouter = new EngineQueryRouter(); + + QueryExpression queryExpression = QueryExpression.create(selectedPathList, expression); + long startTime = System.currentTimeMillis(); + + QueryDataSet queryDataSet = queryRouter.query(queryExpression); + + int count = 0; + while (queryDataSet.hasNext()) { + RowRecord rowRecord = queryDataSet.next(); + count++; + // if (count % 10000 == 0) + // System.out.println(rowRecord); + } + + long endTime = System.currentTimeMillis(); + System.out + .println(String.format("Time consume : %s ms, count number : %s", endTime - startTime, count)); + + } + + public static void output(int cnt, RowRecord rowRecord, boolean flag) { + if (!flag) { + return; + } + + if (cnt % 10000 == 0) { + System.out.println(cnt + " : " + rowRecord); + } + + if (cnt > 97600) { + System.out.println("----" + cnt + " : " + rowRecord); + } + } + + public static Path getPath(int d, int s) { + return new Path(String.format("root.perform.group_0.d_%s.s_%s", d, s)); + } + + private static void insert() throws SQLException { + int d_start = 0, d_end = 10; + int s_start = 0, s_end = 10; + int num = 100000; + prepareSeries(d_start,d_end,s_start,s_end); + prepareData(d_start,d_end,s_start,s_end, num); + } + + private static void prepareData(int d_start, int d_end, int s_start, int s_end, int num) throws SQLException { + System.out.println("prepareData start!"); + Connection connection = null; + try { + connection = DriverManager + .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", + "root"); + Statement statement = connection.createStatement(); + // prepare BufferWrite file + for (int i = 20000; i <= 30000; i++) { + insertInTimestamp(statement, d_start,d_end,s_start,s_end, i); + } + statement.execute("merge"); + System.out.println("prepareData 20000-30000 end "); + + // prepare Unseq-File + for (int i = 1; i <= 10000; i++) { + insertInTimestamp(statement, d_start,d_end,s_start,s_end, i); + } + statement.execute("merge"); + System.out.println("prepareData 1-10000 end "); + + // prepare muilty BufferWrite file + for (int i = 40000; i <= 65000; i++) { + insertInTimestamp(statement, d_start,d_end,s_start,s_end, i); + } + statement.execute("merge"); + System.out.println("prepareData 40000-65000 end "); + + // prepare muilty BufferWrite file + for (int i = 80000; i <= 87000; i++) { + insertInTimestamp(statement, d_start,d_end,s_start,s_end, i); + } + statement.execute("merge"); + System.out.println("prepareData 80000-87000 end "); + + // prepare BufferWrite cache + for (int i = 90001; i <= 100000; i++) { + insertInTimestamp(statement, d_start,d_end,s_start,s_end, i); + } + System.out.println("prepareData 90001-100000 end "); + + // prepare Overflow cache + for (int i = 10001; i <= 20000; i++) { + insertInTimestamp(statement, d_start,d_end,s_start,s_end, i); + } + System.out.println("prepareData 10001-20000 end "); + statement.close(); + } finally { + if (connection != null) { + connection.close(); + } + } + } + + private static void insertInTimestamp(Statement statement,int d_start, int d_end, int s_start, int s_end, int time) + throws SQLException { + for(int m = d_start; m <= d_end; m++ ){ + for(int n = s_start; n<= s_end; n++){ + statement.execute(String.format(insertTemplate, m, n, time, time)); + } + } + } + + private static void prepareSeries(int d_start, int d_end, int s_start, int s_end) throws SQLException { + System.out.println("prepareSeries start!"); + Connection connection = null; + try { + connection = DriverManager + .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", + "root"); + Statement statement = connection.createStatement(); + for(int i = d_start; i <= d_end; i++ ){ + statement.execute(createStorageGroupSql(i)); + } + statement.close(); + + statement = connection.createStatement(); + for(int i = d_start; i <= d_end; i++ ){ + for(int j = s_start; j<= s_end; j++){ + statement.execute(String.format("CREATE TIMESERIES root.perform.group_0.d_%s.s_%s WITH DATATYPE=INT32, ENCODING=RLE", i, j));// + } + } + statement.close(); + + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (connection != null) { + connection.close(); + } + } + } + + public static String createStorageGroupSql(int d){ + return String.format("SET STORAGE GROUP TO root.perform.group_0.d_%s", d); + } + +} diff --git a/iotdb/src/test/java/org/apache/iotdb/db/query/reader/AllDataReaderTest.java b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/AllDataReaderTest.java new file mode 100644 index 0000000..4b16fb4 --- /dev/null +++ b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/AllDataReaderTest.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.reader; + +import static org.junit.Assert.*; + +import java.io.IOException; +import org.apache.iotdb.db.utils.TimeValuePair; +import org.junit.Assert; +import org.junit.Test; + +public class AllDataReaderTest { + + private AllDataReader reader1; + private AllDataReader reader2; + + private void init(){ + IBatchReader batchReader1 = new FakedIBatchPoint(100, 1000, 7, 11); + IBatchReader batchReader2 = new FakedIBatchPoint(100, 1000, 7, 11); + IPointReader pointReader = new FakedIPointReader(20, 500, 11, 19); + + reader1 = new AllDataReader(batchReader1, pointReader); + reader2 = new AllDataReader(batchReader2, null); + } + + @Test + public void test() throws IOException { + init(); + testWithOutNullReader(); + testWithNullPointReader(); + } + + private void testWithOutNullReader() throws IOException { + int cnt = 0; + while (reader1.hasNext()){ + TimeValuePair timeValuePair = reader1.next(); + cnt++; + if((timeValuePair.getTimestamp()-20) % 11 == 0 && timeValuePair.getTimestamp() < 20+500*11){ + Assert.assertEquals(timeValuePair.getTimestamp() % 19, timeValuePair.getValue().getLong()); + continue; + } + if((timeValuePair.getTimestamp()-100) % 7 == 0){ + Assert.assertEquals(timeValuePair.getTimestamp() % 11, timeValuePair.getValue().getLong()); + } + } + Assert.assertEquals(1430, cnt); + System.out.println("testWithOutNullReader-cnt:"+cnt); + } + + private void testWithNullPointReader() throws IOException { + int cnt = 0; + while (reader2.hasNext()){ + TimeValuePair timeValuePair = reader2.next(); + Assert.assertEquals(timeValuePair.getTimestamp() % 11, timeValuePair.getValue().getLong()); + cnt++; + } + Assert.assertEquals(1000, cnt); + System.out.println("testWithNullPointReader-cnt:"+cnt); + } +} \ No newline at end of file diff --git a/iotdb/src/test/java/org/apache/iotdb/db/query/reader/FakedIBatchPoint.java b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/FakedIBatchPoint.java new file mode 100644 index 0000000..7bf8590 --- /dev/null +++ b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/FakedIBatchPoint.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.reader; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import org.apache.iotdb.db.utils.TimeValuePair; +import org.apache.iotdb.db.utils.TsPrimitiveType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.junit.Assert; + +public class FakedIBatchPoint implements IBatchReader { + + private Iterator<TimeValuePair> iterator; + private BatchData batchData; + private boolean hasCachedBatchData; + private boolean hasEmptyBatch; + private Random random; + + private TimeValuePair timeValuePair; + + public FakedIBatchPoint(long startTime, int size, int interval, int modValue, boolean hasEmptyBatch) { + long time = startTime; + List<TimeValuePair> list = new ArrayList<>(); + for (int i = 0; i < size; i++) { + list.add( + new TimeValuePair(time, TsPrimitiveType.getByType(TSDataType.INT64, time % modValue))); + time += interval; + } + iterator = list.iterator(); + this.hasEmptyBatch = hasEmptyBatch; + this.random = new Random(); + this.hasCachedBatchData = false; + } + + public FakedIBatchPoint(long startTime, int size, int interval, int modValue) { + this(startTime, size, interval, modValue, false); + } + + @Override + public boolean hasNext() throws IOException { + if(hasCachedBatchData){ + return true; + } + if(iterator.hasNext()){ + constructBatchData(); + hasCachedBatchData = true; + return true; + } + return false; + } + + @Override + public BatchData nextBatch() throws IOException { + if(hasCachedBatchData){ + hasCachedBatchData = false; + return batchData; + } + else { + constructBatchData(); + return batchData; + } + + } + + @Override + public void close() throws IOException { + + } + + private void constructBatchData(){ + int num = random.nextInt(10); + if(!hasEmptyBatch){ + num+=1; + } + batchData = new BatchData(TSDataType.INT64,true); + while (num > 0 && iterator.hasNext()){ + timeValuePair = iterator.next(); + batchData.putTime(timeValuePair.getTimestamp()); + batchData.putLong(timeValuePair.getValue().getLong()); + num--; + } + if(!hasEmptyBatch){ + Assert.assertTrue(batchData.hasNext()); + } + } +} diff --git a/iotdb/src/test/java/org/apache/iotdb/db/query/reader/FakedIPointReader.java b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/FakedIPointReader.java new file mode 100644 index 0000000..d9dd4ce --- /dev/null +++ b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/FakedIPointReader.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.reader; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import org.apache.iotdb.db.utils.TimeValuePair; +import org.apache.iotdb.db.utils.TsPrimitiveType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + +public class FakedIPointReader implements IPointReader{ + + private Iterator<TimeValuePair> iterator; + private boolean hasCachedTimeValuePair = false; + private TimeValuePair cachedTimeValuePair; + + public FakedIPointReader(long startTime, int size, int interval, int modValue) { + long time = startTime; + List<TimeValuePair> list = new ArrayList<>(); + for (int i = 0; i < size; i++) { + list.add( + new TimeValuePair(time, TsPrimitiveType.getByType(TSDataType.INT64, time % modValue))); + time += interval; + } + iterator = list.iterator(); + } + + @Override + public boolean hasNext() throws IOException { + return hasCachedTimeValuePair || iterator.hasNext(); + } + + @Override + public TimeValuePair next() throws IOException { + if (hasCachedTimeValuePair){ + hasCachedTimeValuePair = false; + return cachedTimeValuePair; + } + return iterator.next(); + } + + @Override + public TimeValuePair current() throws IOException { + if(hasCachedTimeValuePair){ + return cachedTimeValuePair; + } + cachedTimeValuePair = iterator.next(); + hasCachedTimeValuePair = true; + return cachedTimeValuePair; + } + + @Override + public void close() throws IOException { + + } +} diff --git a/iotdb/src/test/java/org/apache/iotdb/db/query/reader/FakedSeriesReaderByTimestamp.java b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/FakedSeriesReaderByTimestamp.java new file mode 100644 index 0000000..821e14e --- /dev/null +++ b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/FakedSeriesReaderByTimestamp.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.reader; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; +import org.apache.iotdb.db.utils.TimeValuePair; +import org.apache.iotdb.db.utils.TsPrimitiveType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + +public class FakedSeriesReaderByTimestamp implements EngineReaderByTimeStamp { + private Iterator<TimeValuePair> iterator; + private boolean hasCachedTimeValuePair = false; + private TimeValuePair cachedTimeValuePair; + + public FakedSeriesReaderByTimestamp(long startTime, int size, int interval, int modValue) { + long time = startTime; + List<TimeValuePair> list = new ArrayList<>(); + for (int i = 0; i < size; i++) { + list.add( + new TimeValuePair(time, TsPrimitiveType.getByType(TSDataType.INT64, time % modValue))); + time += interval; + } + iterator = list.iterator(); + } + + @Override + public TsPrimitiveType getValueInTimestamp(long timestamp) throws IOException { + if(hasCachedTimeValuePair){ + if(timestamp == cachedTimeValuePair.getTimestamp()){ + hasCachedTimeValuePair = false; + return cachedTimeValuePair.getValue(); + } + else if(timestamp > cachedTimeValuePair.getTimestamp()){ + hasCachedTimeValuePair = false; + } + else { + return null; + } + } + while(iterator.hasNext()){ + cachedTimeValuePair = iterator.next(); + if(timestamp == cachedTimeValuePair.getTimestamp()){ + return cachedTimeValuePair.getValue(); + } + else if(timestamp < cachedTimeValuePair.getTimestamp()){ + hasCachedTimeValuePair = true; + break; + } + } + return null; + } + + @Override + public boolean hasNext() throws IOException { + return hasCachedTimeValuePair || iterator.hasNext(); + } + + @Override + public TimeValuePair next() throws IOException { + if (hasCachedTimeValuePair){ + hasCachedTimeValuePair = false; + return cachedTimeValuePair; + } + return iterator.next(); + } + + @Override + public void close() throws IOException { + + } +} + diff --git a/iotdb/src/test/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReaderByTimestampTest.java b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReaderByTimestampTest.java index 03771e8..595399d 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReaderByTimestampTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReaderByTimestampTest.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Random; +import org.apache.iotdb.db.query.reader.IPointReader; import org.apache.iotdb.db.utils.TimeValuePair; import org.apache.iotdb.db.utils.TsPrimitiveType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -104,7 +105,8 @@ public class PriorityMergeReaderByTimestampTest { } - public static class FakedPrioritySeriesReaderByTimestamp implements EngineReaderByTimeStamp { + public static class FakedPrioritySeriesReaderByTimestamp implements EngineReaderByTimeStamp, + IPointReader { private Iterator<TimeValuePair> iterator; private long currentTimeStamp = Long.MIN_VALUE; @@ -150,8 +152,12 @@ public class PriorityMergeReaderByTimestampTest { } @Override - public void skipCurrentTimeValuePair() throws IOException { - next(); + public TimeValuePair current() throws IOException { + if (hasCachedTimeValuePair) { + return cachedTimeValuePair; + } else { + throw new IOException(" to end! " + iterator.next()); + } } @Override @@ -176,20 +182,5 @@ public class PriorityMergeReaderByTimestampTest { } return null; } - - @Override - public boolean hasNextBatch() { - return false; - } - - @Override - public BatchData nextBatch() { - return null; - } - - @Override - public BatchData currentBatch() { - return null; - } } } diff --git a/iotdb/src/test/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReaderTest.java b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReaderTest.java index 66ecf06..4f3b071 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReaderTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReaderTest.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import org.apache.iotdb.db.query.reader.IPointReader; import org.apache.iotdb.db.query.reader.IReader; import org.apache.iotdb.db.utils.TimeValuePair; import org.apache.iotdb.db.utils.TsPrimitiveType; @@ -65,7 +66,7 @@ public class PriorityMergeReaderTest { Assert.assertEquals(162, cnt); } - public static class FakedPrioritySeriesReader implements IReader { + public static class FakedPrioritySeriesReader implements IPointReader { private Iterator<TimeValuePair> iterator; @@ -92,27 +93,13 @@ public class PriorityMergeReaderTest { } @Override - public void skipCurrentTimeValuePair() { - iterator.next(); + public TimeValuePair current() throws IOException { + throw new IOException("current() in FakedPrioritySeriesReader is an empty method."); } - @Override - public void close() { - } - - @Override - public boolean hasNextBatch() { - return false; - } @Override - public BatchData nextBatch() { - return null; - } - - @Override - public BatchData currentBatch() { - return null; + public void close() { } } } diff --git a/iotdb/src/test/java/org/apache/iotdb/db/query/reader/merge/SeriesMergeSortReaderTest.java b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/merge/SeriesMergeSortReaderTest.java index f27948e..fb4d163 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/query/reader/merge/SeriesMergeSortReaderTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/merge/SeriesMergeSortReaderTest.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.query.reader.merge; import java.io.IOException; +import org.apache.iotdb.db.query.reader.IPointReader; import org.apache.iotdb.db.query.reader.IReader; import org.apache.iotdb.db.utils.TimeValuePair; import org.apache.iotdb.db.utils.TsPrimitiveType; @@ -66,7 +67,7 @@ public class SeriesMergeSortReaderTest { } } - public static class FakedSeriesReader implements IReader { + public static class FakedSeriesReader implements IPointReader { private long[] timestamps; private int index; @@ -89,27 +90,12 @@ public class SeriesMergeSortReaderTest { } @Override - public void skipCurrentTimeValuePair() { - next(); + public TimeValuePair current() throws IOException { + return new TimeValuePair(timestamps[index], new TsPrimitiveType.TsLong(value)); } @Override public void close() { } - - @Override - public boolean hasNextBatch() { - return false; - } - - @Override - public BatchData nextBatch() { - return null; - } - - @Override - public BatchData currentBatch() { - return null; - } } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/IReader.java b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReaderTest.java similarity index 57% copy from iotdb/src/main/java/org/apache/iotdb/db/query/reader/IReader.java copy to iotdb/src/test/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReaderTest.java index 6a574d6..2d6e5eb 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/IReader.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReaderTest.java @@ -16,29 +16,27 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.query.reader; -import java.io.IOException; -import org.apache.iotdb.db.utils.TimeValuePair; -import org.apache.iotdb.tsfile.read.common.BatchData; +package org.apache.iotdb.db.query.reader.sequence; -/** - * Vital read interface. Batch method is used to increase query speed. Getting a batch of data - * a time is faster than getting one point a time. - */ -public interface IReader { +import static org.junit.Assert.*; - boolean hasNext() throws IOException; +import org.junit.Test; - TimeValuePair next() throws IOException; +public class SealedTsFilesReaderTest { - void skipCurrentTimeValuePair() throws IOException; + private SealedTsFilesReader sealedTsFilesReader; - void close() throws IOException; + @Test + public void hasNext() { - boolean hasNextBatch(); + } - BatchData nextBatch(); + @Test + public void nextBatch() { + } - BatchData currentBatch(); -} + @Test + public void currentBatch() { + } +} \ No newline at end of file diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java index 125e131..f16c047 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.tsfile.read.reader.chunk; import java.io.IOException; @@ -48,6 +49,9 @@ public abstract class ChunkReader { private BatchData data; + private PageHeader pageHeader; + private boolean hasCachedPageHeader; + /** * Data whose timestamp <= deletedAt should be considered deleted(not be returned). */ @@ -72,38 +76,45 @@ public abstract class ChunkReader { valueDecoder = Decoder .getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType()); data = new BatchData(chunkHeader.getDataType()); - } - - public boolean hasNextBatch() { - return chunkDataBuffer.remaining() > 0; + hasCachedPageHeader = false; } /** - * get next data batch. - * - * @return next data batch - * @throws IOException IOException + * judge if has nextBatch. */ - public BatchData nextBatch() throws IOException { - + public boolean hasNextBatch() throws IOException { + if (hasCachedPageHeader) { + return true; + } // construct next satisfied page header while (chunkDataBuffer.remaining() > 0) { // deserialize a PageHeader from chunkDataBuffer - PageHeader pageHeader = PageHeader - .deserializeFrom(chunkDataBuffer, chunkHeader.getDataType()); + pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunkHeader.getDataType()); // if the current page satisfies if (pageSatisfied(pageHeader)) { - PageReader pageReader = constructPageReaderForNextPage(pageHeader.getCompressedSize()); - if (pageReader.hasNextBatch()) { - data = pageReader.nextBatch(); - return data; - } + hasCachedPageHeader = true; + return true; } else { skipBytesInStreamByLength(pageHeader.getCompressedSize()); } } + return false; + } + /** + * get next data batch. + * + * @return next data batch + * @throws IOException IOException + */ + public BatchData nextBatch() throws IOException { + PageReader pageReader = constructPageReaderForNextPage(pageHeader.getCompressedSize()); + hasCachedPageHeader = false; + if (pageReader.hasNextBatch()) { + data = pageReader.nextBatch(); + return data; + } return data; } @@ -111,6 +122,15 @@ public abstract class ChunkReader { return data; } + public PageHeader nextPageHeader() throws IOException { + return pageHeader; + } + + public void skipPageData() { + skipBytesInStreamByLength(pageHeader.getCompressedSize()); + hasCachedPageHeader = false; + } + private void skipBytesInStreamByLength(long length) { chunkDataBuffer.position(chunkDataBuffer.position() + (int) length); } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java index f42c8ae..110fec3 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java @@ -16,10 +16,12 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.tsfile.read.reader.series; import java.io.IOException; import java.util.List; +import org.apache.iotdb.tsfile.file.header.PageHeader; import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.read.controller.ChunkLoader; @@ -48,30 +50,14 @@ public abstract class FileSeriesReader { /** * check if current chunk has next batch data. + * * @return True if current chunk has next batch data */ - public boolean hasNextBatch() { - - // current chunk has data - if (chunkReader != null && chunkReader.hasNextBatch()) { - return true; - - // has additional chunk to read - } else { - return chunkToRead < chunkMetaDataList.size(); - } - - } - - /** - * get next batch data. - */ - public BatchData nextBatch() throws IOException { + public boolean hasNextBatch() throws IOException { // current chunk has additional batch if (chunkReader != null && chunkReader.hasNextBatch()) { - data = chunkReader.nextBatch(); - return data; + return true; } // current chunk does not have additional batch, init new chunk reader @@ -83,15 +69,18 @@ public abstract class FileSeriesReader { initChunkReader(chunkMetaData); if (chunkReader.hasNextBatch()) { - data = chunkReader.nextBatch(); - return data; + return true; } } } + return false; + } - if (data == null) { - return new BatchData(); - } + /** + * get next batch data. + */ + public BatchData nextBatch() throws IOException { + data = chunkReader.nextBatch(); return data; } @@ -99,6 +88,14 @@ public abstract class FileSeriesReader { return data; } + public PageHeader nextPageHeader() throws IOException { + return chunkReader.nextPageHeader(); + } + + public void skipPageData() { + chunkReader.skipPageData(); + } + protected abstract void initChunkReader(ChunkMetaData chunkMetaData) throws IOException; protected abstract boolean chunkSatisfied(ChunkMetaData chunkMetaData);
