This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/scanOpBatchProcess1.0 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit cf713db5ccd500175578650a1d1a46fd923fc1ab Author: Minghui Liu <[email protected]> AuthorDate: Wed Nov 30 23:13:20 2022 +0800 try to unify --- .../source/AbstractSeriesScanOperator.java | 2 +- .../execution/operator/source/SeriesScanUtil.java | 336 ++------------------- 2 files changed, 33 insertions(+), 305 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java index 98c3dc7d3f..fabb66ebd6 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java @@ -137,7 +137,7 @@ public abstract class AbstractSeriesScanOperator implements DataSourceOperator { } private boolean readPageData() throws IOException { - return seriesScanUtil.tryToFetchDataFromPage(); + return seriesScanUtil.tryToFetchDataFromPage(true); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java index 7d15555852..a238c24442 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java @@ -55,7 +55,6 @@ import java.util.Objects; import java.util.PriorityQueue; import java.util.Set; import java.util.function.ToLongFunction; -import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; @@ -118,9 +117,7 @@ public class SeriesScanUtil { /* * result cache */ - protected boolean hasCachedNextOverlappedPage; - protected TsBlock cachedTsBlock; - + protected boolean lastPageOverlapped; protected TsBlockBuilder cachedTsBlockBuilder; public SeriesScanUtil( @@ -400,64 +397,13 @@ public class SeriesScanUtil { * This method should be called after hasNextChunk() until no next page, make sure that all * overlapped pages are consumed */ - @SuppressWarnings("squid:S3776") - // Suppress high Cognitive Complexity warning public boolean hasNextPage() throws IOException { - - /* - * has overlapped data before - */ - if (hasCachedNextOverlappedPage) { - return true; - } else if (mergeReader.hasNextTimeValuePair() || firstPageOverlapped()) { - if (hasNextOverlappedPage()) { - cachedTsBlock = nextOverlappedPage(); - if (cachedTsBlock != null && !cachedTsBlock.isEmpty()) { - hasCachedNextOverlappedPage = true; - return true; - } - } - } - - if (firstPageReader != null) { - return true; - } - - /* - * construct first page reader - */ - if (firstChunkMetadata != null) { - /* - * try to unpack all overlapped ChunkMetadata to cachedPageReaders - */ - unpackAllOverlappedChunkMetadataToPageReaders( - orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), true); - } else { - /* - * first chunk metadata is already unpacked, consume cached pages - */ - initFirstPageReader(); - } - - if (isExistOverlappedPage()) { - return true; - } - - // make sure firstPageReader won't be null while the unSeqPageReaders has more cached page - // readers - while (firstPageReader == null && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) { - - initFirstPageReader(); - - if (isExistOverlappedPage()) { - return true; - } - } - return firstPageReader != null; + return tryToFetchDataFromPage(false); } - /** Return true if there is new data written to the builder */ - public boolean tryToFetchDataFromPage() throws IOException { + /** @return true if there is new data written to the builder */ + public boolean tryToFetchDataFromPage(boolean consumePageReader) throws IOException { + lastPageOverlapped = false; // has overlapped data if (mergeReader.hasNextTimeValuePair() || firstPageOverlapped()) { @@ -467,7 +413,9 @@ public class SeriesScanUtil { } if (firstPageReader != null) { - appendDataFromPageReader(); + if (consumePageReader) { + appendDataFromPageReader(); + } return true; } @@ -497,13 +445,15 @@ public class SeriesScanUtil { } if (firstPageReader != null) { - appendDataFromPageReader(); + if (consumePageReader) { + appendDataFromPageReader(); + } return true; } return false; } - /** Return true if there is new data written to the builder */ + /** @return true if there is new data written to the builder */ private boolean tryToFetchDataFromOverlappedPage() throws IOException { if (firstPageOverlapped()) { // next page is overlapped, read overlapped data and cache it @@ -523,7 +473,12 @@ public class SeriesScanUtil { firstPageReader = null; } - /** Return true if there is new data written to the builder */ + /** + * read overlapped data till currentLargestEndTime in mergeReader, if current batch does not + * contain data, read till next currentLargestEndTime again. + * + * @return true if there is new data written to the builder + */ private boolean appendDataFromMergeReader() throws IOException { int initialPositionCount = cachedTsBlockBuilder.getPositionCount(); @@ -582,7 +537,8 @@ public class SeriesScanUtil { || (!orderUtils.getAscending() && timeValuePair.getTimestamp() < firstPageReader.getStatistics().getStartTime())) { - return cachedTsBlockBuilder.getPositionCount() > initialPositionCount; + lastPageOverlapped = cachedTsBlockBuilder.getPositionCount() > initialPositionCount; + return lastPageOverlapped; } else if (orderUtils.isOverlapped( timeValuePair.getTimestamp(), firstPageReader.getStatistics())) { // current timeValuePair is overlapped with firstPageReader, add it to merged reader @@ -607,7 +563,8 @@ public class SeriesScanUtil { || (!orderUtils.getAscending() && timeValuePair.getTimestamp() < seqPageReaders.get(0).getStatistics().getStartTime())) { - return cachedTsBlockBuilder.getPositionCount() > initialPositionCount; + lastPageOverlapped = cachedTsBlockBuilder.getPositionCount() > initialPositionCount; + return lastPageOverlapped; } else if (orderUtils.isOverlapped( timeValuePair.getTimestamp(), seqPageReaders.get(0).getStatistics())) { VersionPageReader pageReader = seqPageReaders.remove(0); @@ -692,7 +649,8 @@ public class SeriesScanUtil { /* * if current overlapped page has valid data, return, otherwise read next overlapped page */ - if (cachedTsBlockBuilder.getPositionCount() > initialPositionCount) { + lastPageOverlapped = cachedTsBlockBuilder.getPositionCount() > initialPositionCount; + if (lastPageOverlapped) { return true; } else if (mergeReader.hasNextTimeValuePair()) { // condition: seqPage.endTime < mergeReader.currentTime @@ -704,22 +662,6 @@ public class SeriesScanUtil { } } - private boolean isExistOverlappedPage() throws IOException { - if (firstPageOverlapped()) { - /* - * next page is overlapped, read overlapped data and cache it - */ - if (hasNextOverlappedPage()) { - cachedTsBlock = nextOverlappedPage(); - if (cachedTsBlock != null && !cachedTsBlock.isEmpty()) { - hasCachedNextOverlappedPage = true; - return true; - } - } - } - return false; - } - private boolean firstPageOverlapped() throws IOException { if (firstPageReader == null) { return false; @@ -813,17 +755,12 @@ public class SeriesScanUtil { * first page is overlapped */ boolean isPageOverlapped() throws IOException { - - /* - * has an overlapped page - */ - if (hasCachedNextOverlappedPage) { + // has an overlapped page + if (lastPageOverlapped) { return true; } - /* - * has a non-overlapped page in firstPageReader - */ + // has a non-overlapped page in firstPageReader if (mergeReader.hasNextTimeValuePair() && ((orderUtils.getAscending() && mergeReader.currentTimeValuePair().getTimestamp() @@ -835,7 +772,6 @@ public class SeriesScanUtil { } Statistics firstPageStatistics = firstPageReader.getStatistics(); - return !unSeqPageReaders.isEmpty() && orderUtils.isOverlapped(firstPageStatistics, unSeqPageReaders.peek().getStatistics()); } @@ -867,207 +803,13 @@ public class SeriesScanUtil { firstPageReader = null; } - /** This method should only be used when the method isPageOverlapped() return true. */ public TsBlock nextPage() throws IOException { - - if (hasCachedNextOverlappedPage) { - hasCachedNextOverlappedPage = false; - TsBlock res = cachedTsBlock; - cachedTsBlock = null; - return res; - } else { - - /* - * next page is not overlapped, push down value filter if it exists - */ - if (valueFilter != null) { - firstPageReader.setFilter(valueFilter); - } - TsBlock tsBlock = firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending()); - firstPageReader = null; - - return tsBlock; - } - } - - /** - * read overlapped data till currentLargestEndTime in mergeReader, if current batch does not - * contain data, read till next currentLargestEndTime again - */ - @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning - private boolean hasNextOverlappedPage() throws IOException { - - if (hasCachedNextOverlappedPage) { - return true; - } - - tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader(); - - while (true) { - - // may has overlapped data - if (mergeReader.hasNextTimeValuePair()) { - - // TODO we still need to consider data type, ascending and descending here - TsBlockBuilder builder = new TsBlockBuilder(getTsDataTypeList()); - TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder(); - long currentPageEndPointTime = mergeReader.getCurrentReadStopTime(); - while (mergeReader.hasNextTimeValuePair()) { - - /* - * get current first point in mergeReader, this maybe overlapped later - */ - TimeValuePair timeValuePair = mergeReader.currentTimeValuePair(); - - if (orderUtils.isExcessEndpoint(timeValuePair.getTimestamp(), currentPageEndPointTime)) { - /* - * when the merged point excesses the currentPageEndPointTime, we have read all overlapped data before currentPageEndPointTime - * 1. has cached batch data, we don't need to read more data, just use the cached data later - * 2. has first page reader, which means first page reader last endTime < currentTimeValuePair.getTimestamp(), - * we could just use the first page reader later - * 3. sequence page reader is not empty, which means first page reader last endTime < currentTimeValuePair.getTimestamp(), - * we could use the first sequence page reader later - */ - if (!builder.isEmpty() || firstPageReader != null || !seqPageReaders.isEmpty()) { - break; - } - // so, we don't have other data except mergeReader - currentPageEndPointTime = mergeReader.getCurrentReadStopTime(); - } - - // unpack all overlapped data for the first timeValuePair - unpackAllOverlappedTsFilesToTimeSeriesMetadata(timeValuePair.getTimestamp()); - unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata( - timeValuePair.getTimestamp(), false); - unpackAllOverlappedChunkMetadataToPageReaders(timeValuePair.getTimestamp(), false); - unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp()); - - // update if there are unpacked unSeqPageReaders - timeValuePair = mergeReader.currentTimeValuePair(); - - // from now, the unsequence reader is all unpacked, so we don't need to consider it - // we has first page reader now - if (firstPageReader != null) { - // if current timeValuePair excesses the first page reader's end time, we just use the - // cached data - if ((orderUtils.getAscending() - && timeValuePair.getTimestamp() > firstPageReader.getStatistics().getEndTime()) - || (!orderUtils.getAscending() - && timeValuePair.getTimestamp() - < firstPageReader.getStatistics().getStartTime())) { - hasCachedNextOverlappedPage = !builder.isEmpty(); - cachedTsBlock = builder.build(); - return hasCachedNextOverlappedPage; - } else if (orderUtils.isOverlapped( - timeValuePair.getTimestamp(), firstPageReader.getStatistics())) { - // current timeValuePair is overlapped with firstPageReader, add it to merged reader - // and update endTime to the max end time - mergeReader.addReader( - getPointReader( - firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending())), - firstPageReader.version, - orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()), - context); - currentPageEndPointTime = - updateEndPointTime(currentPageEndPointTime, firstPageReader); - firstPageReader = null; - } - } - - // the seq page readers is not empty, just like first page reader - if (!seqPageReaders.isEmpty()) { - if ((orderUtils.getAscending() - && timeValuePair.getTimestamp() - > seqPageReaders.get(0).getStatistics().getEndTime()) - || (!orderUtils.getAscending() - && timeValuePair.getTimestamp() - < seqPageReaders.get(0).getStatistics().getStartTime())) { - hasCachedNextOverlappedPage = !builder.isEmpty(); - cachedTsBlock = builder.build(); - return hasCachedNextOverlappedPage; - } else if (orderUtils.isOverlapped( - timeValuePair.getTimestamp(), seqPageReaders.get(0).getStatistics())) { - VersionPageReader pageReader = seqPageReaders.remove(0); - mergeReader.addReader( - getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())), - pageReader.version, - orderUtils.getOverlapCheckTime(pageReader.getStatistics()), - context); - currentPageEndPointTime = updateEndPointTime(currentPageEndPointTime, pageReader); - } - } - - /* - * get the latest first point in mergeReader - */ - timeValuePair = mergeReader.nextTimeValuePair(); - - Object valueForFilter = timeValuePair.getValue().getValue(); - - // TODO fix value filter firstNotNullObject, currently, if it's a value filter, it will - // only accept AlignedPath with only one sub sensor - if (timeValuePair.getValue().getDataType() == TSDataType.VECTOR) { - for (TsPrimitiveType tsPrimitiveType : timeValuePair.getValue().getVector()) { - if (tsPrimitiveType != null) { - valueForFilter = tsPrimitiveType.getValue(); - break; - } - } - } - - if (valueFilter == null - || valueFilter.satisfy(timeValuePair.getTimestamp(), valueForFilter)) { - timeBuilder.writeLong(timeValuePair.getTimestamp()); - switch (dataType) { - case BOOLEAN: - builder.getColumnBuilder(0).writeBoolean(timeValuePair.getValue().getBoolean()); - break; - case INT32: - builder.getColumnBuilder(0).writeInt(timeValuePair.getValue().getInt()); - break; - case INT64: - builder.getColumnBuilder(0).writeLong(timeValuePair.getValue().getLong()); - break; - case FLOAT: - builder.getColumnBuilder(0).writeFloat(timeValuePair.getValue().getFloat()); - break; - case DOUBLE: - builder.getColumnBuilder(0).writeDouble(timeValuePair.getValue().getDouble()); - break; - case TEXT: - builder.getColumnBuilder(0).writeBinary(timeValuePair.getValue().getBinary()); - break; - case VECTOR: - TsPrimitiveType[] values = timeValuePair.getValue().getVector(); - for (int i = 0; i < values.length; i++) { - if (values[i] == null) { - builder.getColumnBuilder(i).appendNull(); - } else { - builder.getColumnBuilder(i).writeTsPrimitiveType(values[i]); - } - } - break; - default: - throw new UnSupportedDataTypeException(String.valueOf(dataType)); - } - builder.declarePosition(); - } - } - hasCachedNextOverlappedPage = !builder.isEmpty(); - cachedTsBlock = builder.build(); - /* - * if current overlapped page has valid data, return, otherwise read next overlapped page - */ - if (hasCachedNextOverlappedPage) { - return true; - } else if (mergeReader.hasNextTimeValuePair()) { - // condition: seqPage.endTime < mergeReader.currentTime - return false; - } - } else { - return false; - } + if (!lastPageOverlapped) { + appendDataFromPageReader(); } + TsBlock cachedTsBlock = cachedTsBlockBuilder.build(); + cachedTsBlockBuilder.reset(); + return cachedTsBlock; } private long updateEndPointTime(long currentPageEndPointTime, VersionPageReader pageReader) { @@ -1176,20 +918,6 @@ public class SeriesScanUtil { context); } - private TsBlock nextOverlappedPage() throws IOException { - if (hasCachedNextOverlappedPage || hasNextOverlappedPage()) { - hasCachedNextOverlappedPage = false; - return cachedTsBlock; - } - throw new IOException("No more batch data"); - } - - private LinkedList<TsFileResource> sortUnSeqFileResources(List<TsFileResource> tsFileResources) { - return tsFileResources.stream() - .sorted(orderUtils.comparingLong(tsFileResource -> orderUtils.getOrderTime(tsFileResource))) - .collect(Collectors.toCollection(LinkedList::new)); - } - /** * unpack all overlapped seq/unseq files and find the first TimeSeriesMetadata *
