This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/scanOpBatchProcess1.0 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit dda6cec072bbb5e5cd495a2ab21777a8f06de94f Author: Minghui Liu <[email protected]> AuthorDate: Wed Nov 30 22:48:25 2022 +0800 rename --- .../execution/operator/source/SeriesScanUtil.java | 55 ++++++++++------------ .../query/reader/chunk/MemAlignedPageReader.java | 4 +- .../iotdb/db/query/reader/chunk/MemPageReader.java | 4 +- .../iotdb/tsfile/read/reader/IPageReader.java | 7 +-- .../tsfile/read/reader/page/AlignedPageReader.java | 4 +- .../iotdb/tsfile/read/reader/page/PageReader.java | 4 +- 6 files changed, 37 insertions(+), 41 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java index b667d08858..7d15555852 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java @@ -456,39 +456,32 @@ public class SeriesScanUtil { return firstPageReader != null; } + /** Return true if there is new data written to the builder */ public boolean tryToFetchDataFromPage() throws IOException { - /* - * has overlapped data - */ + // has overlapped data if (mergeReader.hasNextTimeValuePair() || firstPageOverlapped()) { - if (tryToBuildFromMergeReader()) { + if (appendDataFromMergeReader()) { return true; } } if (firstPageReader != null) { - buildFromPageReader(); + appendDataFromPageReader(); return true; } - /* - * construct first page reader - */ + // construct first page reader if (firstChunkMetadata != null) { - /* - * try to unpack all overlapped ChunkMetadata to cachedPageReaders - */ + // try to unpack all overlapped ChunkMetadata to cachedPageReaders unpackAllOverlappedChunkMetadataToPageReaders( orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), true); } else { - /* - * first chunk metadata is already unpacked, consume cached pages - */ + // first chunk metadata is already unpacked, consume cached pages initFirstPageReader(); } - if (tryToBuildFromOverlappedPage()) { + if (tryToFetchDataFromOverlappedPage()) { return true; } @@ -498,39 +491,41 @@ public class SeriesScanUtil { initFirstPageReader(); - if (tryToBuildFromOverlappedPage()) { + if (tryToFetchDataFromOverlappedPage()) { return true; } } if (firstPageReader != null) { - buildFromPageReader(); + appendDataFromPageReader(); return true; } return false; } - private boolean tryToBuildFromOverlappedPage() throws IOException { + /** Return true if there is new data written to the builder */ + private boolean tryToFetchDataFromOverlappedPage() throws IOException { if (firstPageOverlapped()) { // next page is overlapped, read overlapped data and cache it - return tryToBuildFromMergeReader(); + return appendDataFromMergeReader(); } return false; } - public void buildFromPageReader() throws IOException { + public void appendDataFromPageReader() throws IOException { /* * next page is not overlapped, push down value filter if it exists */ if (valueFilter != null) { firstPageReader.setFilter(valueFilter); } - firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending(), cachedTsBlockBuilder); + firstPageReader.appendPageDataToBuilder(orderUtils.getAscending(), cachedTsBlockBuilder); firstPageReader = null; } - private boolean tryToBuildFromMergeReader() throws IOException { - int rawSize = cachedTsBlockBuilder.getPositionCount(); + /** Return true if there is new data written to the builder */ + private boolean appendDataFromMergeReader() throws IOException { + int initialPositionCount = cachedTsBlockBuilder.getPositionCount(); tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader(); @@ -558,7 +553,7 @@ public class SeriesScanUtil { * 3. sequence page reader is not empty, which means first page reader last endTime < currentTimeValuePair.getTimestamp(), * we could use the first sequence page reader later */ - if (cachedTsBlockBuilder.getPositionCount() > rawSize + if (cachedTsBlockBuilder.getPositionCount() > initialPositionCount || firstPageReader != null || !seqPageReaders.isEmpty()) { break; @@ -587,7 +582,7 @@ public class SeriesScanUtil { || (!orderUtils.getAscending() && timeValuePair.getTimestamp() < firstPageReader.getStatistics().getStartTime())) { - return cachedTsBlockBuilder.getPositionCount() > rawSize; + return cachedTsBlockBuilder.getPositionCount() > initialPositionCount; } else if (orderUtils.isOverlapped( timeValuePair.getTimestamp(), firstPageReader.getStatistics())) { // current timeValuePair is overlapped with firstPageReader, add it to merged reader @@ -612,7 +607,7 @@ public class SeriesScanUtil { || (!orderUtils.getAscending() && timeValuePair.getTimestamp() < seqPageReaders.get(0).getStatistics().getStartTime())) { - return cachedTsBlockBuilder.getPositionCount() > rawSize; + return cachedTsBlockBuilder.getPositionCount() > initialPositionCount; } else if (orderUtils.isOverlapped( timeValuePair.getTimestamp(), seqPageReaders.get(0).getStatistics())) { VersionPageReader pageReader = seqPageReaders.remove(0); @@ -697,7 +692,7 @@ public class SeriesScanUtil { /* * if current overlapped page has valid data, return, otherwise read next overlapped page */ - if (cachedTsBlockBuilder.getPositionCount() > rawSize) { + if (cachedTsBlockBuilder.getPositionCount() > initialPositionCount) { return true; } else if (mergeReader.hasNextTimeValuePair()) { // condition: seqPage.endTime < mergeReader.currentTime @@ -1351,7 +1346,7 @@ public class SeriesScanUtil { return cachedTsBlockBuilder; } - protected class VersionPageReader { + protected static class VersionPageReader { protected PriorityMergeReader.MergeReaderPriority version; protected IPageReader data; @@ -1394,8 +1389,8 @@ public class SeriesScanUtil { return tsBlock; } - void getAllSatisfiedPageData(boolean ascending, TsBlockBuilder builder) throws IOException { - data.getAllSatisfiedData(ascending, builder); + void appendPageDataToBuilder(boolean ascending, TsBlockBuilder builder) throws IOException { + data.appendAllSatisfiedDataToBuilder(ascending, builder); } void setFilter(Filter filter) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java index ae13babf9f..b7bfecc3fb 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java @@ -88,12 +88,12 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader { @Override public TsBlock getAllSatisfiedData() { builder.reset(); - writeDataToBuilder(builder); + appendDataToBuilder(builder); return builder.build(); } @Override - public void writeDataToBuilder(TsBlockBuilder builder) { + public void appendDataToBuilder(TsBlockBuilder builder) { boolean[] satisfyInfo = new boolean[tsBlock.getPositionCount()]; for (int row = 0; row < tsBlock.getPositionCount(); row++) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java index 85f08de1c8..1638d06dfb 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java @@ -92,12 +92,12 @@ public class MemPageReader implements IPageReader { public TsBlock getAllSatisfiedData() { TSDataType dataType = chunkMetadata.getDataType(); TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(dataType)); - writeDataToBuilder(builder); + appendDataToBuilder(builder); return builder.build(); } @Override - public void writeDataToBuilder(TsBlockBuilder builder) { + public void appendDataToBuilder(TsBlockBuilder builder) { TSDataType dataType = chunkMetadata.getDataType(); TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder(); ColumnBuilder valueBuilder = builder.getColumnBuilder(0); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java index 1ba2f30218..761e7044a4 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java @@ -40,11 +40,12 @@ public interface IPageReader { TsBlock getAllSatisfiedData() throws IOException; - void writeDataToBuilder(TsBlockBuilder builder) throws IOException; + void appendDataToBuilder(TsBlockBuilder builder) throws IOException; - default void getAllSatisfiedData(boolean ascending, TsBlockBuilder builder) throws IOException { + default void appendAllSatisfiedDataToBuilder(boolean ascending, TsBlockBuilder builder) + throws IOException { if (ascending) { - writeDataToBuilder(builder); + appendDataToBuilder(builder); } else { TsBlock tsBlock = getAllSatisfiedData(); tsBlock.reverse(); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java index faae955198..61097d3d9e 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java @@ -116,12 +116,12 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader { @Override public TsBlock getAllSatisfiedData() throws IOException { builder.reset(); - writeDataToBuilder(builder); + appendDataToBuilder(builder); return builder.build(); } @Override - public void writeDataToBuilder(TsBlockBuilder builder) throws IOException { + public void appendDataToBuilder(TsBlockBuilder builder) throws IOException { long[] timeBatch = timePageReader.getNextTimeBatch(); // if all the sub sensors' value are null in current row, just discard it diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java index 190bb11b5c..2fe7f04540 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java @@ -161,12 +161,12 @@ public class PageReader implements IPageReader { @Override public TsBlock getAllSatisfiedData() throws IOException { TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(dataType)); - writeDataToBuilder(builder); + appendDataToBuilder(builder); return builder.build(); } @Override - public void writeDataToBuilder(TsBlockBuilder builder) throws IOException { + public void appendDataToBuilder(TsBlockBuilder builder) throws IOException { TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder(); ColumnBuilder valueBuilder = builder.getColumnBuilder(0); if (filter == null || filter.satisfy(getStatistics())) {
