This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch iotdb in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit 4467fa07d767cd93d0520b71d02002d9eac42b01 Author: Jackie Tien <[email protected]> AuthorDate: Thu Sep 12 11:51:33 2024 +0800 Add TableChunkReader and TablePageReader to support keep all null rows while scanning --- ...java => AbstractAlignedTimeSeriesMetadata.java} | 43 +-- .../file/metadata/AlignedTimeSeriesMetadata.java | 168 +---------- .../tsfile/file/metadata/TableDeviceMetadata.java | 50 ++++ .../apache/tsfile/read/common/block/TsBlock.java | 12 - ...Reader.java => AbstractAlignedChunkReader.java} | 80 +++--- .../read/reader/chunk/AlignedChunkReader.java | 228 ++------------- .../tsfile/read/reader/chunk/TableChunkReader.java | 93 +++++++ ...eReader.java => AbstractAlignedPageReader.java} | 192 +++---------- .../tsfile/read/reader/page/AlignedPageReader.java | 309 ++------------------- .../tsfile/read/reader/page/TablePageReader.java | 137 +++++++++ .../tsfile/read/reader/page/ValuePageReader.java | 8 + 11 files changed, 438 insertions(+), 882 deletions(-) diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AlignedTimeSeriesMetadata.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AbstractAlignedTimeSeriesMetadata.java similarity index 84% copy from java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AlignedTimeSeriesMetadata.java copy to java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AbstractAlignedTimeSeriesMetadata.java index 583f08ea..4e812a89 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AlignedTimeSeriesMetadata.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AbstractAlignedTimeSeriesMetadata.java @@ -29,32 +29,21 @@ import java.util.Collections; import java.util.List; import java.util.Optional; -public class AlignedTimeSeriesMetadata implements ITimeSeriesMetadata { +public abstract class AbstractAlignedTimeSeriesMetadata implements ITimeSeriesMetadata { // TimeSeriesMetadata for time column - private final TimeseriesMetadata timeseriesMetadata; + protected final TimeseriesMetadata timeseriesMetadata; // TimeSeriesMetadata for all subSensors in the vector - private final List<TimeseriesMetadata> valueTimeseriesMetadataList; + protected final List<TimeseriesMetadata> valueTimeseriesMetadataList; - private IChunkMetadataLoader chunkMetadataLoader; + protected IChunkMetadataLoader chunkMetadataLoader; - public AlignedTimeSeriesMetadata( + AbstractAlignedTimeSeriesMetadata( TimeseriesMetadata timeseriesMetadata, List<TimeseriesMetadata> valueTimeseriesMetadataList) { this.timeseriesMetadata = timeseriesMetadata; this.valueTimeseriesMetadataList = valueTimeseriesMetadataList; } - /** - * If the vector contains only one sub sensor, just return the sub sensor's Statistics Otherwise, - * return the Statistics of the time column. - */ - @Override - public Statistics getStatistics() { - return valueTimeseriesMetadataList.size() == 1 && valueTimeseriesMetadataList.get(0) != null - ? valueTimeseriesMetadataList.get(0).getStatistics() - : timeseriesMetadata.getStatistics(); - } - @Override public Statistics<? extends Serializable> getTimeStatistics() { return timeseriesMetadata.getStatistics(); @@ -80,18 +69,6 @@ public class AlignedTimeSeriesMetadata implements ITimeSeriesMetadata { return valueTimeseriesMetadataList.size(); } - @Override - public boolean timeAllSelected() { - for (int index = 0; index < getMeasurementCount(); index++) { - if (!hasNullValue(index)) { - // When there is any value page point number that is the same as the time page, - // it means that all timestamps in time page will be selected. - return true; - } - } - return false; - } - @Override public boolean isModified() { return timeseriesMetadata.isModified(); @@ -176,14 +153,18 @@ public class AlignedTimeSeriesMetadata implements ITimeSeriesMetadata { exits = (exits || v != null); chunkMetadataList.add(v); } - if (exits) { - res.add(new AlignedChunkMetadata(timeChunkMetadata.get(i), chunkMetadataList)); - } + constructAlignedChunkMetadata(res, timeChunkMetadata.get(i), chunkMetadataList, exits); } } return res; } + abstract void constructAlignedChunkMetadata( + List<AlignedChunkMetadata> res, + IChunkMetadata timeChunkMetadata, + List<IChunkMetadata> chunkMetadataList, + boolean exits); + @Override public void setChunkMetadataLoader(IChunkMetadataLoader chunkMetadataLoader) { this.chunkMetadataLoader = chunkMetadataLoader; diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AlignedTimeSeriesMetadata.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AlignedTimeSeriesMetadata.java index 583f08ea..ba5cbce4 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AlignedTimeSeriesMetadata.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AlignedTimeSeriesMetadata.java @@ -19,29 +19,15 @@ package org.apache.tsfile.file.metadata; -import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.statistics.Statistics; -import org.apache.tsfile.read.controller.IChunkMetadataLoader; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import java.util.Optional; -public class AlignedTimeSeriesMetadata implements ITimeSeriesMetadata { - - // TimeSeriesMetadata for time column - private final TimeseriesMetadata timeseriesMetadata; - // TimeSeriesMetadata for all subSensors in the vector - private final List<TimeseriesMetadata> valueTimeseriesMetadataList; - - private IChunkMetadataLoader chunkMetadataLoader; +public class AlignedTimeSeriesMetadata extends AbstractAlignedTimeSeriesMetadata { public AlignedTimeSeriesMetadata( TimeseriesMetadata timeseriesMetadata, List<TimeseriesMetadata> valueTimeseriesMetadataList) { - this.timeseriesMetadata = timeseriesMetadata; - this.valueTimeseriesMetadataList = valueTimeseriesMetadataList; + super(timeseriesMetadata, valueTimeseriesMetadataList); } /** @@ -55,31 +41,6 @@ public class AlignedTimeSeriesMetadata implements ITimeSeriesMetadata { : timeseriesMetadata.getStatistics(); } - @Override - public Statistics<? extends Serializable> getTimeStatistics() { - return timeseriesMetadata.getStatistics(); - } - - @Override - public Optional<Statistics<? extends Serializable>> getMeasurementStatistics( - int measurementIndex) { - TimeseriesMetadata metadata = valueTimeseriesMetadataList.get(measurementIndex); - return Optional.ofNullable(metadata == null ? null : metadata.getStatistics()); - } - - @Override - public boolean hasNullValue(int measurementIndex) { - long rowCount = getTimeStatistics().getCount(); - Optional<Statistics<? extends Serializable>> statistics = - getMeasurementStatistics(measurementIndex); - return statistics.map(stat -> stat.hasNullValue(rowCount)).orElse(true); - } - - @Override - public int getMeasurementCount() { - return valueTimeseriesMetadataList.size(); - } - @Override public boolean timeAllSelected() { for (int index = 0; index < getMeasurementCount(); index++) { @@ -93,124 +54,13 @@ public class AlignedTimeSeriesMetadata implements ITimeSeriesMetadata { } @Override - public boolean isModified() { - return timeseriesMetadata.isModified(); - } - - @Override - public void setModified(boolean modified) { - timeseriesMetadata.setModified(modified); - for (TimeseriesMetadata subSensor : valueTimeseriesMetadataList) { - if (subSensor != null) { - subSensor.setModified(modified); - } - } - } - - @Override - public boolean isSeq() { - return timeseriesMetadata.isSeq(); - } - - @Override - public void setSeq(boolean seq) { - timeseriesMetadata.setSeq(seq); - for (TimeseriesMetadata subSensor : valueTimeseriesMetadataList) { - if (subSensor != null) { - subSensor.setSeq(seq); - } + void constructAlignedChunkMetadata( + List<AlignedChunkMetadata> res, + IChunkMetadata timeChunkMetadata, + List<IChunkMetadata> chunkMetadataList, + boolean exits) { + if (exits) { + res.add(new AlignedChunkMetadata(timeChunkMetadata, chunkMetadataList)); } } - - /** - * If the chunkMetadataLoader is MemChunkMetadataLoader, the VectorChunkMetadata is already - * assembled while constructing the in-memory TsFileResource, so we just return the assembled - * VectorChunkMetadata list. - * - * <p>Otherwise, we need to assemble the ChunkMetadata of time column and the ChunkMetadata of all - * the subSensors to generate the VectorChunkMetadata - */ - @Override - public List<IChunkMetadata> loadChunkMetadataList() { - return chunkMetadataLoader.loadChunkMetadataList(this); - } - - public List<AlignedChunkMetadata> getCopiedChunkMetadataList() { - List<IChunkMetadata> timeChunkMetadata = timeseriesMetadata.getCopiedChunkMetadataList(); - List<List<IChunkMetadata>> valueChunkMetadataList = new ArrayList<>(); - for (TimeseriesMetadata metadata : valueTimeseriesMetadataList) { - valueChunkMetadataList.add(metadata == null ? null : metadata.getCopiedChunkMetadataList()); - } - - return getAlignedChunkMetadata(timeChunkMetadata, valueChunkMetadataList); - } - - public List<AlignedChunkMetadata> getChunkMetadataList() { - List<IChunkMetadata> timeChunkMetadata = timeseriesMetadata.getChunkMetadataList(); - List<List<IChunkMetadata>> valueChunkMetadataList = new ArrayList<>(); - for (TimeseriesMetadata metadata : valueTimeseriesMetadataList) { - valueChunkMetadataList.add(metadata == null ? null : metadata.getChunkMetadataList()); - } - - return getAlignedChunkMetadata(timeChunkMetadata, valueChunkMetadataList); - } - - /** Notice: if all the value chunks is empty chunk, then return empty list. */ - private List<AlignedChunkMetadata> getAlignedChunkMetadata( - List<IChunkMetadata> timeChunkMetadata, List<List<IChunkMetadata>> valueChunkMetadataList) { - List<AlignedChunkMetadata> res = new ArrayList<>(); - for (int i = 0; i < timeChunkMetadata.size(); i++) { - // only need time column - if (valueTimeseriesMetadataList.isEmpty()) { - res.add(new AlignedChunkMetadata(timeChunkMetadata.get(i), Collections.emptyList())); - } else { - List<IChunkMetadata> chunkMetadataList = new ArrayList<>(); - // only at least one sensor exits, we add the AlignedChunkMetadata to the list - boolean exits = false; - for (List<IChunkMetadata> chunkMetadata : valueChunkMetadataList) { - IChunkMetadata v = - chunkMetadata == null - || chunkMetadata.get(i).getStatistics().getCount() == 0 // empty chunk - ? null - : chunkMetadata.get(i); - exits = (exits || v != null); - chunkMetadataList.add(v); - } - if (exits) { - res.add(new AlignedChunkMetadata(timeChunkMetadata.get(i), chunkMetadataList)); - } - } - } - return res; - } - - @Override - public void setChunkMetadataLoader(IChunkMetadataLoader chunkMetadataLoader) { - this.chunkMetadataLoader = chunkMetadataLoader; - } - - @Override - public boolean typeMatch(List<TSDataType> dataTypes) { - if (valueTimeseriesMetadataList != null) { - int notMatchCount = 0; - for (int i = 0, size = dataTypes.size(); i < size; i++) { - TimeseriesMetadata valueTimeSeriesMetadata = valueTimeseriesMetadataList.get(i); - if (valueTimeSeriesMetadata != null - && !valueTimeSeriesMetadata.typeMatch(dataTypes.get(i))) { - valueTimeseriesMetadataList.set(i, null); - notMatchCount++; - } - } - return notMatchCount != dataTypes.size(); - } - return true; - } - - public List<TimeseriesMetadata> getValueTimeseriesMetadataList() { - return valueTimeseriesMetadataList; - } - - public TimeseriesMetadata getTimeseriesMetadata() { - return timeseriesMetadata; - } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableDeviceMetadata.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableDeviceMetadata.java new file mode 100644 index 00000000..8614a532 --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableDeviceMetadata.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.file.metadata; + +import org.apache.tsfile.file.metadata.statistics.Statistics; + +import java.util.List; + +public class TableDeviceMetadata extends AbstractAlignedTimeSeriesMetadata { + + public TableDeviceMetadata( + TimeseriesMetadata timeseriesMetadata, List<TimeseriesMetadata> valueTimeseriesMetadataList) { + super(timeseriesMetadata, valueTimeseriesMetadataList); + } + + /** + * If the vector contains only one sub sensor, just return the sub sensor's Statistics Otherwise, + * return the Statistics of the time column. + */ + @Override + public Statistics getStatistics() { + return timeseriesMetadata.getStatistics(); + } + + @Override + void constructAlignedChunkMetadata( + List<AlignedChunkMetadata> res, + IChunkMetadata timeChunkMetadata, + List<IChunkMetadata> chunkMetadataList, + boolean exits) { + res.add(new AlignedChunkMetadata(timeChunkMetadata, chunkMetadataList)); + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlock.java b/java/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlock.java index c607e906..1cbd10ed 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlock.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlock.java @@ -455,9 +455,6 @@ public class TsBlock { @Override public boolean hasNextTimeValuePair() { - while (hasNext() && isCurrentValueAllNull()) { - next(); - } return hasNext(); } @@ -499,15 +496,6 @@ public class TsBlock { public void setRowIndex(int rowIndex) { this.rowIndex = rowIndex; } - - private boolean isCurrentValueAllNull() { - for (Column valueColumn : valueColumns) { - if (!valueColumn.isNull(rowIndex)) { - return false; - } - } - return true; - } } private long updateRetainedSize() { diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java similarity index 84% copy from java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java copy to java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java index 865fdd21..165367e2 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java @@ -30,7 +30,7 @@ import org.apache.tsfile.file.metadata.statistics.Statistics; import org.apache.tsfile.read.common.Chunk; import org.apache.tsfile.read.common.TimeRange; import org.apache.tsfile.read.filter.basic.Filter; -import org.apache.tsfile.read.reader.page.AlignedPageReader; +import org.apache.tsfile.read.reader.page.AbstractAlignedPageReader; import org.apache.tsfile.read.reader.page.LazyLoadPageData; import java.io.IOException; @@ -39,8 +39,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -public class AlignedChunkReader extends AbstractChunkReader { - +public abstract class AbstractAlignedChunkReader extends AbstractChunkReader { // chunk header of the time column private final ChunkHeader timeChunkHeader; // chunk data of the time column @@ -56,7 +55,7 @@ public class AlignedChunkReader extends AbstractChunkReader { private final IDecryptor decrytor; @SuppressWarnings("unchecked") - public AlignedChunkReader( + AbstractAlignedChunkReader( Chunk timeChunk, List<Chunk> valueChunkList, long readStopTime, Filter queryFilter) throws IOException { super(readStopTime, queryFilter); @@ -76,24 +75,6 @@ public class AlignedChunkReader extends AbstractChunkReader { initAllPageReaders(timeChunk.getChunkStatistic(), valueChunkStatisticsList); } - public AlignedChunkReader(Chunk timeChunk, List<Chunk> valueChunkList) throws IOException { - this(timeChunk, valueChunkList, Long.MIN_VALUE, null); - } - - public AlignedChunkReader(Chunk timeChunk, List<Chunk> valueChunkList, Filter queryFilter) - throws IOException { - this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter); - } - - /** - * Constructor of ChunkReader by timestamp. This constructor is used to accelerate queries by - * filtering out pages whose endTime is less than current timestamp. - */ - public AlignedChunkReader(Chunk timeChunk, List<Chunk> valueChunkList, long readStopTime) - throws IOException { - this(timeChunk, valueChunkList, readStopTime, null); - } - /** construct all the page readers in this chunk */ private void initAllPageReaders( Statistics<? extends Serializable> timeChunkStatistics, @@ -102,7 +83,7 @@ public class AlignedChunkReader extends AbstractChunkReader { // construct next satisfied page header while (timeChunkDataBuffer.remaining() > 0) { // deserialize PageHeader from chunkDataBuffer - AlignedPageReader alignedPageReader = + AbstractAlignedPageReader alignedPageReader = isSinglePageChunk() ? deserializeFromSinglePageChunk(timeChunkStatistics, valueChunkStatisticsList) : deserializeFromMultiPageChunk(); @@ -116,7 +97,7 @@ public class AlignedChunkReader extends AbstractChunkReader { return (timeChunkHeader.getChunkType() & 0x3F) == MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER; } - private AlignedPageReader deserializeFromSinglePageChunk( + private AbstractAlignedPageReader deserializeFromSinglePageChunk( Statistics<? extends Serializable> timeChunkStatistics, List<Statistics<? extends Serializable>> valueChunkStatisticsList) throws IOException { @@ -136,7 +117,7 @@ public class AlignedChunkReader extends AbstractChunkReader { } } - if (isAllNull || isEarlierThanReadStopTime(timePageHeader)) { + if (needSkipForSinglePageChunk(isAllNull, timePageHeader)) { // when there is only one page in the chunk, the page statistic is the same as the chunk, so // we needn't filter the page again skipCurrentPage(timePageHeader, valuePageHeaderList); @@ -145,7 +126,9 @@ public class AlignedChunkReader extends AbstractChunkReader { return constructAlignedPageReader(timePageHeader, valuePageHeaderList); } - private AlignedPageReader deserializeFromMultiPageChunk() throws IOException { + abstract boolean needSkipForSinglePageChunk(boolean isAllNull, PageHeader timePageHeader); + + private AbstractAlignedPageReader deserializeFromMultiPageChunk() throws IOException { PageHeader timePageHeader = PageHeader.deserializeFrom(timeChunkDataBuffer, timeChunkHeader.getDataType()); List<PageHeader> valuePageHeaderList = new ArrayList<>(); @@ -162,18 +145,20 @@ public class AlignedChunkReader extends AbstractChunkReader { } } - if (isAllNull || isEarlierThanReadStopTime(timePageHeader) || pageCanSkip(timePageHeader)) { + if (needSkipForMultiPageChunk(isAllNull, timePageHeader)) { skipCurrentPage(timePageHeader, valuePageHeaderList); return null; } return constructAlignedPageReader(timePageHeader, valuePageHeaderList); } + abstract boolean needSkipForMultiPageChunk(boolean isAllNull, PageHeader timePageHeader); + protected boolean isEarlierThanReadStopTime(final PageHeader timePageHeader) { return timePageHeader.getEndTime() < readStopTime; } - private boolean pageCanSkip(PageHeader pageHeader) { + protected boolean pageCanSkip(PageHeader pageHeader) { return queryFilter != null && !queryFilter.satisfyStartEndTime(pageHeader.getStartTime(), pageHeader.getEndTime()); } @@ -192,7 +177,7 @@ public class AlignedChunkReader extends AbstractChunkReader { } } - private AlignedPageReader constructAlignedPageReader( + private AbstractAlignedPageReader constructAlignedPageReader( PageHeader timePageHeader, List<PageHeader> rawValuePageHeaderList) throws IOException { ByteBuffer timePageData = ChunkReader.deserializePageData( @@ -245,23 +230,34 @@ public class AlignedChunkReader extends AbstractChunkReader { isAllNull = false; } } - if (isAllNull) { + if (canSkip(isAllNull, timePageHeader)) { return null; } - AlignedPageReader alignedPageReader = - new AlignedPageReader( - timePageHeader, - timePageData, - defaultTimeDecoder, - valuePageHeaderList, - lazyLoadPageDataArray, - valueDataTypeList, - valueDecoderList, - queryFilter); - alignedPageReader.setDeleteIntervalList(valueDeleteIntervalsList); - return alignedPageReader; + return constructPageReader( + timePageHeader, + timePageData, + defaultTimeDecoder, + valuePageHeaderList, + lazyLoadPageDataArray, + valueDataTypeList, + valueDecoderList, + queryFilter, + valueDeleteIntervalsList); } + abstract boolean canSkip(boolean isAllNull, PageHeader timePageHeader); + + abstract AbstractAlignedPageReader constructPageReader( + PageHeader timePageHeader, + ByteBuffer timePageData, + Decoder timeDecoder, + List<PageHeader> valuePageHeaderList, + LazyLoadPageData[] lazyLoadPageDataArray, + List<TSDataType> valueDataTypeList, + List<Decoder> valueDecoderList, + Filter queryFilter, + List<List<TimeRange>> valueDeleteIntervalsList); + protected boolean pageDeleted(PageHeader pageHeader, List<TimeRange> deleteIntervals) { if (pageHeader.getEndTime() < readStopTime) { return true; diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java index 865fdd21..e7879734 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java @@ -19,61 +19,26 @@ package org.apache.tsfile.read.reader.chunk; -import org.apache.tsfile.compress.IUnCompressor; import org.apache.tsfile.encoding.decoder.Decoder; -import org.apache.tsfile.encrypt.IDecryptor; import org.apache.tsfile.enums.TSDataType; -import org.apache.tsfile.file.MetaMarker; -import org.apache.tsfile.file.header.ChunkHeader; import org.apache.tsfile.file.header.PageHeader; -import org.apache.tsfile.file.metadata.statistics.Statistics; import org.apache.tsfile.read.common.Chunk; import org.apache.tsfile.read.common.TimeRange; import org.apache.tsfile.read.filter.basic.Filter; +import org.apache.tsfile.read.reader.page.AbstractAlignedPageReader; import org.apache.tsfile.read.reader.page.AlignedPageReader; import org.apache.tsfile.read.reader.page.LazyLoadPageData; import java.io.IOException; -import java.io.Serializable; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.List; -public class AlignedChunkReader extends AbstractChunkReader { +public class AlignedChunkReader extends AbstractAlignedChunkReader { - // chunk header of the time column - private final ChunkHeader timeChunkHeader; - // chunk data of the time column - private final ByteBuffer timeChunkDataBuffer; - - // chunk headers of all the sub sensors - private final List<ChunkHeader> valueChunkHeaderList = new ArrayList<>(); - // chunk data of all the sub sensors - private final List<ByteBuffer> valueChunkDataBufferList = new ArrayList<>(); - // deleted intervals of all the sub sensors - private final List<List<TimeRange>> valueDeleteIntervalsList = new ArrayList<>(); - - private final IDecryptor decrytor; - - @SuppressWarnings("unchecked") public AlignedChunkReader( Chunk timeChunk, List<Chunk> valueChunkList, long readStopTime, Filter queryFilter) throws IOException { - super(readStopTime, queryFilter); - this.timeChunkHeader = timeChunk.getHeader(); - this.timeChunkDataBuffer = timeChunk.getData(); - - List<Statistics<? extends Serializable>> valueChunkStatisticsList = new ArrayList<>(); - valueChunkList.forEach( - chunk -> { - this.valueChunkHeaderList.add(chunk == null ? null : chunk.getHeader()); - this.valueChunkDataBufferList.add(chunk == null ? null : chunk.getData()); - this.valueDeleteIntervalsList.add(chunk == null ? null : chunk.getDeleteIntervalList()); - - valueChunkStatisticsList.add(chunk == null ? null : chunk.getChunkStatistic()); - }); - this.decrytor = timeChunk.getDecryptor(); - initAllPageReaders(timeChunk.getChunkStatistic(), valueChunkStatisticsList); + super(timeChunk, valueChunkList, readStopTime, queryFilter); } public AlignedChunkReader(Chunk timeChunk, List<Chunk> valueChunkList) throws IOException { @@ -94,165 +59,37 @@ public class AlignedChunkReader extends AbstractChunkReader { this(timeChunk, valueChunkList, readStopTime, null); } - /** construct all the page readers in this chunk */ - private void initAllPageReaders( - Statistics<? extends Serializable> timeChunkStatistics, - List<Statistics<? extends Serializable>> valueChunkStatisticsList) - throws IOException { - // construct next satisfied page header - while (timeChunkDataBuffer.remaining() > 0) { - // deserialize PageHeader from chunkDataBuffer - AlignedPageReader alignedPageReader = - isSinglePageChunk() - ? deserializeFromSinglePageChunk(timeChunkStatistics, valueChunkStatisticsList) - : deserializeFromMultiPageChunk(); - if (alignedPageReader != null) { - pageReaderList.add(alignedPageReader); - } - } + @Override + boolean needSkipForSinglePageChunk(boolean isAllNull, PageHeader timePageHeader) { + return isAllNull || isEarlierThanReadStopTime(timePageHeader); } - private boolean isSinglePageChunk() { - return (timeChunkHeader.getChunkType() & 0x3F) == MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER; + @Override + boolean needSkipForMultiPageChunk(boolean isAllNull, PageHeader timePageHeader) { + return isAllNull || isEarlierThanReadStopTime(timePageHeader) || pageCanSkip(timePageHeader); } - private AlignedPageReader deserializeFromSinglePageChunk( - Statistics<? extends Serializable> timeChunkStatistics, - List<Statistics<? extends Serializable>> valueChunkStatisticsList) - throws IOException { - PageHeader timePageHeader = - PageHeader.deserializeFrom(timeChunkDataBuffer, timeChunkStatistics); - List<PageHeader> valuePageHeaderList = new ArrayList<>(); - - boolean isAllNull = true; - for (int i = 0; i < valueChunkDataBufferList.size(); i++) { - if (valueChunkDataBufferList.get(i) != null) { - isAllNull = false; - valuePageHeaderList.add( - PageHeader.deserializeFrom( - valueChunkDataBufferList.get(i), valueChunkStatisticsList.get(i))); - } else { - valuePageHeaderList.add(null); - } - } - - if (isAllNull || isEarlierThanReadStopTime(timePageHeader)) { - // when there is only one page in the chunk, the page statistic is the same as the chunk, so - // we needn't filter the page again - skipCurrentPage(timePageHeader, valuePageHeaderList); - return null; - } - return constructAlignedPageReader(timePageHeader, valuePageHeaderList); - } - - private AlignedPageReader deserializeFromMultiPageChunk() throws IOException { - PageHeader timePageHeader = - PageHeader.deserializeFrom(timeChunkDataBuffer, timeChunkHeader.getDataType()); - List<PageHeader> valuePageHeaderList = new ArrayList<>(); - - boolean isAllNull = true; - for (int i = 0; i < valueChunkDataBufferList.size(); i++) { - if (valueChunkDataBufferList.get(i) != null) { - isAllNull = false; - valuePageHeaderList.add( - PageHeader.deserializeFrom( - valueChunkDataBufferList.get(i), valueChunkHeaderList.get(i).getDataType())); - } else { - valuePageHeaderList.add(null); - } - } - - if (isAllNull || isEarlierThanReadStopTime(timePageHeader) || pageCanSkip(timePageHeader)) { - skipCurrentPage(timePageHeader, valuePageHeaderList); - return null; - } - return constructAlignedPageReader(timePageHeader, valuePageHeaderList); + @Override + boolean canSkip(boolean isAllNull, PageHeader timePageHeader) { + return isAllNull; } - protected boolean isEarlierThanReadStopTime(final PageHeader timePageHeader) { - return timePageHeader.getEndTime() < readStopTime; - } - - private boolean pageCanSkip(PageHeader pageHeader) { - return queryFilter != null - && !queryFilter.satisfyStartEndTime(pageHeader.getStartTime(), pageHeader.getEndTime()); - } - - private void skipCurrentPage(PageHeader timePageHeader, List<PageHeader> valuePageHeader) { - timeChunkDataBuffer.position( - timeChunkDataBuffer.position() + timePageHeader.getCompressedSize()); - for (int i = 0; i < valuePageHeader.size(); i++) { - if (valuePageHeader.get(i) != null) { - valueChunkDataBufferList - .get(i) - .position( - valueChunkDataBufferList.get(i).position() - + valuePageHeader.get(i).getCompressedSize()); - } - } - } - - private AlignedPageReader constructAlignedPageReader( - PageHeader timePageHeader, List<PageHeader> rawValuePageHeaderList) throws IOException { - ByteBuffer timePageData = - ChunkReader.deserializePageData( - timePageHeader, timeChunkDataBuffer, timeChunkHeader, decrytor); - - List<PageHeader> valuePageHeaderList = new ArrayList<>(); - LazyLoadPageData[] lazyLoadPageDataArray = new LazyLoadPageData[rawValuePageHeaderList.size()]; - List<TSDataType> valueDataTypeList = new ArrayList<>(); - List<Decoder> valueDecoderList = new ArrayList<>(); - - boolean isAllNull = true; - for (int i = 0; i < rawValuePageHeaderList.size(); i++) { - PageHeader valuePageHeader = rawValuePageHeaderList.get(i); - - if (valuePageHeader == null || valuePageHeader.getUncompressedSize() == 0) { - // Empty Page - valuePageHeaderList.add(null); - lazyLoadPageDataArray[i] = null; - valueDataTypeList.add(null); - valueDecoderList.add(null); - } else if (pageDeleted(valuePageHeader, valueDeleteIntervalsList.get(i))) { - valueChunkDataBufferList - .get(i) - .position( - valueChunkDataBufferList.get(i).position() + valuePageHeader.getCompressedSize()); - valuePageHeaderList.add(null); - lazyLoadPageDataArray[i] = null; - valueDataTypeList.add(null); - valueDecoderList.add(null); - } else { - ChunkHeader valueChunkHeader = valueChunkHeaderList.get(i); - int currentPagePosition = valueChunkDataBufferList.get(i).position(); - // adjust position as if we have read the page data even if it is just lazy-loaded - valueChunkDataBufferList - .get(i) - .position( - valueChunkDataBufferList.get(i).position() + valuePageHeader.getCompressedSize()); - - valuePageHeaderList.add(valuePageHeader); - lazyLoadPageDataArray[i] = - new LazyLoadPageData( - valueChunkDataBufferList.get(i).array(), - currentPagePosition, - IUnCompressor.getUnCompressor(valueChunkHeader.getCompressionType()), - decrytor); - valueDataTypeList.add(valueChunkHeader.getDataType()); - valueDecoderList.add( - Decoder.getDecoderByType( - valueChunkHeader.getEncodingType(), valueChunkHeader.getDataType())); - isAllNull = false; - } - } - if (isAllNull) { - return null; - } + @Override + AbstractAlignedPageReader constructPageReader( + PageHeader timePageHeader, + ByteBuffer timePageData, + Decoder timeDecoder, + List<PageHeader> valuePageHeaderList, + LazyLoadPageData[] lazyLoadPageDataArray, + List<TSDataType> valueDataTypeList, + List<Decoder> valueDecoderList, + Filter queryFilter, + List<List<TimeRange>> valueDeleteIntervalsList) { AlignedPageReader alignedPageReader = new AlignedPageReader( timePageHeader, timePageData, - defaultTimeDecoder, + timeDecoder, valuePageHeaderList, lazyLoadPageDataArray, valueDataTypeList, @@ -261,21 +98,4 @@ public class AlignedChunkReader extends AbstractChunkReader { alignedPageReader.setDeleteIntervalList(valueDeleteIntervalsList); return alignedPageReader; } - - protected boolean pageDeleted(PageHeader pageHeader, List<TimeRange> deleteIntervals) { - if (pageHeader.getEndTime() < readStopTime) { - return true; - } - if (deleteIntervals != null) { - for (TimeRange range : deleteIntervals) { - if (range.contains(pageHeader.getStartTime(), pageHeader.getEndTime())) { - return true; - } - if (range.overlaps(new TimeRange(pageHeader.getStartTime(), pageHeader.getEndTime()))) { - pageHeader.setModified(true); - } - } - } - return false; - } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/TableChunkReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/TableChunkReader.java new file mode 100644 index 00000000..01a39810 --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/TableChunkReader.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read.reader.chunk; + +import org.apache.tsfile.encoding.decoder.Decoder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.header.PageHeader; +import org.apache.tsfile.read.common.Chunk; +import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.read.filter.basic.Filter; +import org.apache.tsfile.read.reader.page.AbstractAlignedPageReader; +import org.apache.tsfile.read.reader.page.LazyLoadPageData; +import org.apache.tsfile.read.reader.page.TablePageReader; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +// difference with AlignedChunkReader is that TableChunkReader works for TableScan and keep all null +// rows +public class TableChunkReader extends AbstractAlignedChunkReader { + + private final List<TimeRange> timeDeleteIntervalsList; + + public TableChunkReader( + Chunk timeChunk, List<Chunk> valueChunkList, long readStopTime, Filter queryFilter) + throws IOException { + super(timeChunk, valueChunkList, readStopTime, queryFilter); + timeDeleteIntervalsList = timeChunk.getDeleteIntervalList(); + } + + public TableChunkReader(Chunk timeChunk, List<Chunk> valueChunkList, Filter queryFilter) + throws IOException { + this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter); + } + + @Override + boolean needSkipForSinglePageChunk(boolean isAllNull, PageHeader timePageHeader) { + return isEarlierThanReadStopTime(timePageHeader); + } + + @Override + boolean needSkipForMultiPageChunk(boolean isAllNull, PageHeader timePageHeader) { + return isEarlierThanReadStopTime(timePageHeader) || pageCanSkip(timePageHeader); + } + + @Override + boolean canSkip(boolean isAllNull, PageHeader timePageHeader) { + return pageDeleted(timePageHeader, timeDeleteIntervalsList); + } + + @Override + AbstractAlignedPageReader constructPageReader( + PageHeader timePageHeader, + ByteBuffer timePageData, + Decoder timeDecoder, + List<PageHeader> valuePageHeaderList, + LazyLoadPageData[] lazyLoadPageDataArray, + List<TSDataType> valueDataTypeList, + List<Decoder> valueDecoderList, + Filter queryFilter, + List<List<TimeRange>> valueDeleteIntervalsList) { + TablePageReader alignedPageReader = + new TablePageReader( + timePageHeader, + timePageData, + timeDecoder, + valuePageHeaderList, + lazyLoadPageDataArray, + valueDataTypeList, + valueDecoderList, + queryFilter); + alignedPageReader.setDeleteIntervalList(timeDeleteIntervalsList, valueDeleteIntervalsList); + return alignedPageReader; + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AbstractAlignedPageReader.java similarity index 68% copy from java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java copy to java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AbstractAlignedPageReader.java index 5307ed35..986c487a 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AbstractAlignedPageReader.java @@ -25,13 +25,11 @@ import org.apache.tsfile.file.header.PageHeader; import org.apache.tsfile.file.metadata.statistics.Statistics; import org.apache.tsfile.read.common.BatchData; import org.apache.tsfile.read.common.BatchDataFactory; -import org.apache.tsfile.read.common.TimeRange; import org.apache.tsfile.read.common.block.TsBlock; import org.apache.tsfile.read.common.block.TsBlockBuilder; import org.apache.tsfile.read.common.block.TsBlockUtil; import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.read.reader.IPageReader; -import org.apache.tsfile.read.reader.IPointReader; import org.apache.tsfile.read.reader.series.PaginationController; import org.apache.tsfile.utils.TsPrimitiveType; @@ -41,28 +39,27 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Objects; import java.util.Optional; import static org.apache.tsfile.read.reader.series.PaginationController.UNLIMITED_PAGINATION_CONTROLLER; -public class AlignedPageReader implements IPageReader { +public abstract class AbstractAlignedPageReader implements IPageReader { - private final TimePageReader timePageReader; - private final List<ValuePageReader> valuePageReaderList; - private final int valueCount; + protected final TimePageReader timePageReader; + protected final List<ValuePageReader> valuePageReaderList; + protected final int valueCount; - private final Filter globalTimeFilter; - private Filter pushDownFilter; - private PaginationController paginationController = UNLIMITED_PAGINATION_CONTROLLER; + protected final Filter globalTimeFilter; + protected Filter pushDownFilter; + protected PaginationController paginationController = UNLIMITED_PAGINATION_CONTROLLER; - private boolean isModified; - private TsBlockBuilder builder; + protected boolean isModified; + protected TsBlockBuilder builder; - private static final int MASK = 0x80; + protected static final int MASK = 0x80; @SuppressWarnings("squid:S107") - public AlignedPageReader( + AbstractAlignedPageReader( PageHeader timePageHeader, ByteBuffer timePageData, Decoder timeDecoder, @@ -93,7 +90,7 @@ public class AlignedPageReader implements IPageReader { } @SuppressWarnings("squid:S107") - public AlignedPageReader( + AbstractAlignedPageReader( PageHeader timePageHeader, ByteBuffer timePageData, Decoder timeDecoder, @@ -152,51 +149,32 @@ public class AlignedPageReader implements IPageReader { } } - if (hasNotNullValues && satisfyRecordFilter(timestamp, rowValues)) { + if (keepCurrentRow(hasNotNullValues, timestamp, rowValues)) { pageData.putVector(timestamp, v); } } return pageData.flip(); } - private boolean satisfyRecordFilter(long timestamp, Object[] rowValues) { + abstract boolean keepCurrentRow(boolean hasNotNullValues, long timestamp, Object[] rowValues); + + protected boolean satisfyRecordFilter(long timestamp, Object[] rowValues) { return (globalTimeFilter == null || globalTimeFilter.satisfyRow(timestamp, rowValues)) && (pushDownFilter == null || pushDownFilter.satisfyRow(timestamp, rowValues)); } - @Override - public boolean timeAllSelected() { - for (int index = 0; index < getMeasurementCount(); index++) { - if (!hasNullValue(index)) { - // When there is any value page point number that is the same as the time page, - // it means that all timestamps in time page will be selected. - return true; - } - } - return false; - } - @Override public int getMeasurementCount() { return valueCount; } - public IPointReader getLazyPointReader() throws IOException { - return new LazyLoadAlignedPagePointReader(timePageReader, valuePageReaderList); - } - - private boolean allPageDataSatisfy() { - return !isModified - && timeAllSelected() - && globalTimeFilterAllSatisfy() - && pushDownFilterAllSatisfy(); - } + abstract boolean allPageDataSatisfy(); - private boolean globalTimeFilterAllSatisfy() { + boolean globalTimeFilterAllSatisfy() { return globalTimeFilter == null || globalTimeFilter.allSatisfy(this); } - private boolean pushDownFilterAllSatisfy() { + boolean pushDownFilterAllSatisfy() { return pushDownFilter == null || pushDownFilter.allSatisfy(this); } @@ -209,7 +187,6 @@ public class AlignedPageReader implements IPageReader { return builder.build(); } - // if all the sub sensors' value are null in current row, just discard it // if !filter.satisfy, discard this row boolean[] keepCurrentRow = new boolean[timeBatch.length]; boolean globalTimeFilterAllSatisfy = globalTimeFilterAllSatisfy(); @@ -219,26 +196,13 @@ public class AlignedPageReader implements IPageReader { updateKeepCurrentRowThroughGlobalTimeFilter(keepCurrentRow, timeBatch); } - boolean[][] isDeleted = null; - if ((isModified || !timeAllSelected()) && valueCount != 0) { - // using bitMap in valuePageReaders to indicate whether columns of current row are all null. - byte[] bitmask = new byte[(timeBatch.length - 1) / 8 + 1]; - Arrays.fill(bitmask, (byte) 0x00); - isDeleted = new boolean[valueCount][timeBatch.length]; - - fillIsDeletedAndBitMask(timeBatch, isDeleted, bitmask); - - updateKeepCurrentRowThroughBitmask(keepCurrentRow, bitmask); + if (timePageReader.isModified()) { + updateKeepCurrentRowThroughDeletion(keepCurrentRow, timeBatch); } boolean pushDownFilterAllSatisfy = pushDownFilterAllSatisfy(); - // construct time column - // when pushDownFilterAllSatisfy = true, we can skip rows by OFFSET & LIMIT - int readEndIndex = buildTimeColumn(timeBatch, keepCurrentRow, pushDownFilterAllSatisfy); - - // construct value columns - buildValueColumns(readEndIndex, keepCurrentRow, isDeleted); + constructResult(keepCurrentRow, timeBatch, pushDownFilterAllSatisfy); TsBlock unFilteredBlock = builder.build(); if (pushDownFilterAllSatisfy) { @@ -288,7 +252,26 @@ public class AlignedPageReader implements IPageReader { } } - private int buildTimeColumn( + abstract void constructResult( + boolean[] keepCurrentRow, long[] timeBatch, boolean pushDownFilterAllSatisfy) + throws IOException; + + private void updateKeepCurrentRowThroughGlobalTimeFilter( + boolean[] keepCurrentRow, long[] timeBatch) { + for (int i = 0, n = timeBatch.length; i < n; i++) { + keepCurrentRow[i] = globalTimeFilter.satisfy(timeBatch[i], null); + } + } + + private void updateKeepCurrentRowThroughDeletion(boolean[] keepCurrentRow, long[] timeBatch) { + for (int i = 0, n = timeBatch.length; i < n; i++) { + if (keepCurrentRow[i]) { + keepCurrentRow[i] = !timePageReader.isDeleted(timeBatch[i]); + } + } + } + + protected int buildTimeColumn( long[] timeBatch, boolean[] keepCurrentRow, boolean pushDownFilterAllSatisfy) { if (pushDownFilterAllSatisfy) { return buildTimeColumnWithPagination(timeBatch, keepCurrentRow); @@ -329,97 +312,6 @@ public class AlignedPageReader implements IPageReader { return readEndIndex + 1; } - private void buildValueColumns(int readEndIndex, boolean[] keepCurrentRow, boolean[][] isDeleted) - throws IOException { - for (int i = 0; i < valueCount; i++) { - ValuePageReader pageReader = valuePageReaderList.get(i); - if (pageReader != null) { - if (pageReader.isModified()) { - pageReader.writeColumnBuilderWithNextBatch( - readEndIndex, - builder.getColumnBuilder(i), - keepCurrentRow, - Objects.requireNonNull(isDeleted)[i]); - } else { - pageReader.writeColumnBuilderWithNextBatch( - readEndIndex, builder.getColumnBuilder(i), keepCurrentRow); - } - } else { - for (int j = 0; j < readEndIndex; j++) { - if (keepCurrentRow[j]) { - builder.getColumnBuilder(i).appendNull(); - } - } - } - } - } - - private void fillIsDeletedAndBitMask(long[] timeBatch, boolean[][] isDeleted, byte[] bitmask) - throws IOException { - for (int columnIndex = 0; columnIndex < valueCount; columnIndex++) { - ValuePageReader pageReader = valuePageReaderList.get(columnIndex); - if (pageReader != null) { - byte[] bitmap = pageReader.getBitmap(); - - if (pageReader.isModified()) { - pageReader.fillIsDeleted(timeBatch, isDeleted[columnIndex]); - updateBitmapThroughIsDeleted(bitmap, isDeleted[columnIndex]); - } - - for (int i = 0, n = bitmask.length; i < n; i++) { - bitmask[i] = (byte) (bitmap[i] | bitmask[i]); - } - } - } - } - - private void updateBitmapThroughIsDeleted(byte[] bitmap, boolean[] isDeleted) { - for (int i = 0, n = isDeleted.length; i < n; i++) { - if (isDeleted[i]) { - int shift = i % 8; - bitmap[i / 8] = (byte) (bitmap[i / 8] & (~(MASK >>> shift))); - } - } - } - - private void updateKeepCurrentRowThroughGlobalTimeFilter( - boolean[] keepCurrentRow, long[] timeBatch) { - for (int i = 0, n = timeBatch.length; i < n; i++) { - keepCurrentRow[i] = globalTimeFilter.satisfy(timeBatch[i], null); - } - } - - private void updateKeepCurrentRowThroughBitmask(boolean[] keepCurrentRow, byte[] bitmask) { - for (int i = 0, n = bitmask.length; i < n; i++) { - if (bitmask[i] == (byte) 0xFF) { - // 8 rows are not all null, do nothing - } else if (bitmask[i] == (byte) 0x00) { - Arrays.fill(keepCurrentRow, i * 8, Math.min(i * 8 + 8, keepCurrentRow.length), false); - } else { - for (int j = 0; j < 8 && (i * 8 + j < keepCurrentRow.length); j++) { - if (((bitmask[i] & 0xFF) & (MASK >>> j)) == 0) { - keepCurrentRow[i * 8 + j] = false; - } - } - } - } - } - - public void setDeleteIntervalList(List<List<TimeRange>> list) { - for (int i = 0; i < valueCount; i++) { - if (valuePageReaderList.get(i) != null) { - valuePageReaderList.get(i).setDeleteIntervalList(list.get(i)); - } - } - } - - @Override - public Statistics<? extends Serializable> getStatistics() { - return valuePageReaderList.size() == 1 && valuePageReaderList.get(0) != null - ? valuePageReaderList.get(0).getStatistics() - : timePageReader.getStatistics(); - } - @Override public Statistics<? extends Serializable> getTimeStatistics() { return timePageReader.getStatistics(); diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java index 5307ed35..f180152c 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java @@ -23,43 +23,18 @@ import org.apache.tsfile.encoding.decoder.Decoder; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.header.PageHeader; import org.apache.tsfile.file.metadata.statistics.Statistics; -import org.apache.tsfile.read.common.BatchData; -import org.apache.tsfile.read.common.BatchDataFactory; import org.apache.tsfile.read.common.TimeRange; -import org.apache.tsfile.read.common.block.TsBlock; -import org.apache.tsfile.read.common.block.TsBlockBuilder; -import org.apache.tsfile.read.common.block.TsBlockUtil; import org.apache.tsfile.read.filter.basic.Filter; -import org.apache.tsfile.read.reader.IPageReader; import org.apache.tsfile.read.reader.IPointReader; -import org.apache.tsfile.read.reader.series.PaginationController; -import org.apache.tsfile.utils.TsPrimitiveType; import java.io.IOException; import java.io.Serializable; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Objects; -import java.util.Optional; -import static org.apache.tsfile.read.reader.series.PaginationController.UNLIMITED_PAGINATION_CONTROLLER; - -public class AlignedPageReader implements IPageReader { - - private final TimePageReader timePageReader; - private final List<ValuePageReader> valuePageReaderList; - private final int valueCount; - - private final Filter globalTimeFilter; - private Filter pushDownFilter; - private PaginationController paginationController = UNLIMITED_PAGINATION_CONTROLLER; - - private boolean isModified; - private TsBlockBuilder builder; - - private static final int MASK = 0x80; +public class AlignedPageReader extends AbstractAlignedPageReader { @SuppressWarnings("squid:S107") public AlignedPageReader( @@ -71,25 +46,15 @@ public class AlignedPageReader implements IPageReader { List<TSDataType> valueDataTypeList, List<Decoder> valueDecoderList, Filter globalTimeFilter) { - timePageReader = new TimePageReader(timePageHeader, timePageData, timeDecoder); - isModified = timePageReader.isModified(); - valuePageReaderList = new ArrayList<>(valuePageHeaderList.size()); - for (int i = 0; i < valuePageHeaderList.size(); i++) { - if (valuePageHeaderList.get(i) != null) { - ValuePageReader valuePageReader = - new ValuePageReader( - valuePageHeaderList.get(i), - valuePageDataList.get(i), - valueDataTypeList.get(i), - valueDecoderList.get(i)); - valuePageReaderList.add(valuePageReader); - isModified = isModified || valuePageReader.isModified(); - } else { - valuePageReaderList.add(null); - } - } - this.globalTimeFilter = globalTimeFilter; - this.valueCount = valuePageReaderList.size(); + super( + timePageHeader, + timePageData, + timeDecoder, + valuePageHeaderList, + valuePageDataList, + valueDataTypeList, + valueDecoderList, + globalTimeFilter); } @SuppressWarnings("squid:S107") @@ -105,63 +70,20 @@ public class AlignedPageReader implements IPageReader { List<TSDataType> valueDataTypeList, List<Decoder> valueDecoderList, Filter globalTimeFilter) { - timePageReader = new TimePageReader(timePageHeader, timePageData, timeDecoder); - isModified = timePageReader.isModified(); - valuePageReaderList = new ArrayList<>(valuePageHeaderList.size()); - for (int i = 0; i < valuePageHeaderList.size(); i++) { - if (valuePageHeaderList.get(i) != null) { - ValuePageReader valuePageReader = - new ValuePageReader( - valuePageHeaderList.get(i), - lazyLoadPageDataArray[i], - valueDataTypeList.get(i), - valueDecoderList.get(i)); - valuePageReaderList.add(valuePageReader); - isModified = isModified || valuePageReader.isModified(); - } else { - valuePageReaderList.add(null); - } - } - this.globalTimeFilter = globalTimeFilter; - this.valueCount = valuePageReaderList.size(); + super( + timePageHeader, + timePageData, + timeDecoder, + valuePageHeaderList, + lazyLoadPageDataArray, + valueDataTypeList, + valueDecoderList, + globalTimeFilter); } @Override - public BatchData getAllSatisfiedPageData(boolean ascending) throws IOException { - BatchData pageData = BatchDataFactory.createBatchData(TSDataType.VECTOR, ascending, false); - int timeIndex = -1; - Object[] rowValues = new Object[valueCount]; - while (timePageReader.hasNextTime()) { - long timestamp = timePageReader.nextTime(); - timeIndex++; - - TsPrimitiveType[] v = new TsPrimitiveType[valueCount]; - // if all the sub sensors' value are null in current row, just discard it - boolean hasNotNullValues = false; - for (int i = 0; i < valueCount; i++) { - ValuePageReader pageReader = valuePageReaderList.get(i); - if (pageReader != null) { - v[i] = pageReader.nextValue(timestamp, timeIndex); - rowValues[i] = (v[i] == null) ? null : v[i].getValue(); - } else { - v[i] = null; - rowValues[i] = null; - } - if (rowValues[i] != null) { - hasNotNullValues = true; - } - } - - if (hasNotNullValues && satisfyRecordFilter(timestamp, rowValues)) { - pageData.putVector(timestamp, v); - } - } - return pageData.flip(); - } - - private boolean satisfyRecordFilter(long timestamp, Object[] rowValues) { - return (globalTimeFilter == null || globalTimeFilter.satisfyRow(timestamp, rowValues)) - && (pushDownFilter == null || pushDownFilter.satisfyRow(timestamp, rowValues)); + boolean keepCurrentRow(boolean hasNotNullValues, long timestamp, Object[] rowValues) { + return hasNotNullValues && satisfyRecordFilter(timestamp, rowValues); } @Override @@ -176,49 +98,23 @@ public class AlignedPageReader implements IPageReader { return false; } - @Override - public int getMeasurementCount() { - return valueCount; - } - public IPointReader getLazyPointReader() throws IOException { return new LazyLoadAlignedPagePointReader(timePageReader, valuePageReaderList); } - private boolean allPageDataSatisfy() { + @Override + boolean allPageDataSatisfy() { return !isModified && timeAllSelected() && globalTimeFilterAllSatisfy() && pushDownFilterAllSatisfy(); } - private boolean globalTimeFilterAllSatisfy() { - return globalTimeFilter == null || globalTimeFilter.allSatisfy(this); - } - - private boolean pushDownFilterAllSatisfy() { - return pushDownFilter == null || pushDownFilter.allSatisfy(this); - } - @Override - public TsBlock getAllSatisfiedData() throws IOException { - long[] timeBatch = timePageReader.getNextTimeBatch(); - - if (allPageDataSatisfy()) { - buildResultWithoutAnyFilterAndDelete(timeBatch); - return builder.build(); - } + void constructResult(boolean[] keepCurrentRow, long[] timeBatch, boolean pushDownFilterAllSatisfy) + throws IOException { // if all the sub sensors' value are null in current row, just discard it - // if !filter.satisfy, discard this row - boolean[] keepCurrentRow = new boolean[timeBatch.length]; - boolean globalTimeFilterAllSatisfy = globalTimeFilterAllSatisfy(); - if (globalTimeFilterAllSatisfy) { - Arrays.fill(keepCurrentRow, true); - } else { - updateKeepCurrentRowThroughGlobalTimeFilter(keepCurrentRow, timeBatch); - } - boolean[][] isDeleted = null; if ((isModified || !timeAllSelected()) && valueCount != 0) { // using bitMap in valuePageReaders to indicate whether columns of current row are all null. @@ -231,102 +127,12 @@ public class AlignedPageReader implements IPageReader { updateKeepCurrentRowThroughBitmask(keepCurrentRow, bitmask); } - boolean pushDownFilterAllSatisfy = pushDownFilterAllSatisfy(); - // construct time column // when pushDownFilterAllSatisfy = true, we can skip rows by OFFSET & LIMIT int readEndIndex = buildTimeColumn(timeBatch, keepCurrentRow, pushDownFilterAllSatisfy); // construct value columns buildValueColumns(readEndIndex, keepCurrentRow, isDeleted); - - TsBlock unFilteredBlock = builder.build(); - if (pushDownFilterAllSatisfy) { - // OFFSET & LIMIT has been consumed in buildTimeColumn - return unFilteredBlock; - } - builder.reset(); - return TsBlockUtil.applyFilterAndLimitOffsetToTsBlock( - unFilteredBlock, builder, pushDownFilter, paginationController); - } - - private void buildResultWithoutAnyFilterAndDelete(long[] timeBatch) throws IOException { - if (paginationController.hasCurOffset(timeBatch.length)) { - paginationController.consumeOffset(timeBatch.length); - } else { - int readStartIndex = 0; - if (paginationController.hasCurOffset()) { - readStartIndex = (int) paginationController.getCurOffset(); - // consume the remaining offset - paginationController.consumeOffset(readStartIndex); - } - - // not included - int readEndIndex = timeBatch.length; - if (paginationController.hasCurLimit() && paginationController.getCurLimit() > 0) { - readEndIndex = - Math.min(readEndIndex, readStartIndex + (int) paginationController.getCurLimit()); - paginationController.consumeLimit((long) readEndIndex - readStartIndex); - } - - // construct time column - for (int i = readStartIndex; i < readEndIndex; i++) { - builder.getTimeColumnBuilder().writeLong(timeBatch[i]); - builder.declarePosition(); - } - - // construct value columns - for (int i = 0; i < valueCount; i++) { - ValuePageReader pageReader = valuePageReaderList.get(i); - if (pageReader != null) { - pageReader.writeColumnBuilderWithNextBatch( - readStartIndex, readEndIndex, builder.getColumnBuilder(i)); - } else { - builder.getColumnBuilder(i).appendNull(readEndIndex - readStartIndex); - } - } - } - } - - private int buildTimeColumn( - long[] timeBatch, boolean[] keepCurrentRow, boolean pushDownFilterAllSatisfy) { - if (pushDownFilterAllSatisfy) { - return buildTimeColumnWithPagination(timeBatch, keepCurrentRow); - } else { - return buildTimeColumnWithoutPagination(timeBatch, keepCurrentRow); - } - } - - private int buildTimeColumnWithPagination(long[] timeBatch, boolean[] keepCurrentRow) { - int readEndIndex = timeBatch.length; - for (int rowIndex = 0; rowIndex < timeBatch.length; rowIndex++) { - if (keepCurrentRow[rowIndex]) { - if (paginationController.hasCurOffset()) { - paginationController.consumeOffset(); - keepCurrentRow[rowIndex] = false; - } else if (paginationController.hasCurLimit()) { - builder.getTimeColumnBuilder().writeLong(timeBatch[rowIndex]); - builder.declarePosition(); - paginationController.consumeLimit(); - } else { - readEndIndex = rowIndex; - break; - } - } - } - return readEndIndex; - } - - private int buildTimeColumnWithoutPagination(long[] timeBatch, boolean[] keepCurrentRow) { - int readEndIndex = 0; - for (int i = 0; i < timeBatch.length; i++) { - if (keepCurrentRow[i]) { - builder.getTimeColumnBuilder().writeLong(timeBatch[i]); - builder.declarePosition(); - readEndIndex = i; - } - } - return readEndIndex + 1; } private void buildValueColumns(int readEndIndex, boolean[] keepCurrentRow, boolean[][] isDeleted) @@ -382,13 +188,6 @@ public class AlignedPageReader implements IPageReader { } } - private void updateKeepCurrentRowThroughGlobalTimeFilter( - boolean[] keepCurrentRow, long[] timeBatch) { - for (int i = 0, n = timeBatch.length; i < n; i++) { - keepCurrentRow[i] = globalTimeFilter.satisfy(timeBatch[i], null); - } - } - private void updateKeepCurrentRowThroughBitmask(boolean[] keepCurrentRow, byte[] bitmask) { for (int i = 0, n = bitmask.length; i < n; i++) { if (bitmask[i] == (byte) 0xFF) { @@ -419,62 +218,4 @@ public class AlignedPageReader implements IPageReader { ? valuePageReaderList.get(0).getStatistics() : timePageReader.getStatistics(); } - - @Override - public Statistics<? extends Serializable> getTimeStatistics() { - return timePageReader.getStatistics(); - } - - @Override - public Optional<Statistics<? extends Serializable>> getMeasurementStatistics( - int measurementIndex) { - ValuePageReader valuePageReader = valuePageReaderList.get(measurementIndex); - return Optional.ofNullable(valuePageReader == null ? null : valuePageReader.getStatistics()); - } - - @Override - public boolean hasNullValue(int measurementIndex) { - long rowCount = getTimeStatistics().getCount(); - Optional<Statistics<? extends Serializable>> statistics = - getMeasurementStatistics(measurementIndex); - return statistics.map(stat -> stat.hasNullValue(rowCount)).orElse(true); - } - - @Override - public void addRecordFilter(Filter filter) { - this.pushDownFilter = filter; - } - - @Override - public void setLimitOffset(PaginationController paginationController) { - this.paginationController = paginationController; - } - - @Override - public boolean isModified() { - return isModified; - } - - @Override - public void initTsBlockBuilder(List<TSDataType> dataTypes) { - if (paginationController.hasLimit()) { - builder = - new TsBlockBuilder( - (int) - Math.min( - paginationController.getCurLimit(), - timePageReader.getStatistics().getCount()), - dataTypes); - } else { - builder = new TsBlockBuilder((int) timePageReader.getStatistics().getCount(), dataTypes); - } - } - - public TimePageReader getTimePageReader() { - return timePageReader; - } - - public List<ValuePageReader> getValuePageReaderList() { - return valuePageReaderList; - } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/TablePageReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/TablePageReader.java new file mode 100644 index 00000000..e43baedf --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/TablePageReader.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read.reader.page; + +import org.apache.tsfile.encoding.decoder.Decoder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.header.PageHeader; +import org.apache.tsfile.file.metadata.statistics.Statistics; +import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.read.filter.basic.Filter; + +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.List; + +// difference with AlignedPageReader is that TablePageReader works for TableScan and keep all null +// rows +public class TablePageReader extends AbstractAlignedPageReader { + + public TablePageReader( + PageHeader timePageHeader, + ByteBuffer timePageData, + Decoder timeDecoder, + List<PageHeader> valuePageHeaderList, + List<ByteBuffer> valuePageDataList, + List<TSDataType> valueDataTypeList, + List<Decoder> valueDecoderList, + Filter globalTimeFilter) { + super( + timePageHeader, + timePageData, + timeDecoder, + valuePageHeaderList, + valuePageDataList, + valueDataTypeList, + valueDecoderList, + globalTimeFilter); + } + + public TablePageReader( + PageHeader timePageHeader, + ByteBuffer timePageData, + Decoder timeDecoder, + List<PageHeader> valuePageHeaderList, + LazyLoadPageData[] lazyLoadPageDataArray, + List<TSDataType> valueDataTypeList, + List<Decoder> valueDecoderList, + Filter globalTimeFilter) { + super( + timePageHeader, + timePageData, + timeDecoder, + valuePageHeaderList, + lazyLoadPageDataArray, + valueDataTypeList, + valueDecoderList, + globalTimeFilter); + } + + @Override + public Statistics<? extends Serializable> getStatistics() { + return timePageReader.getStatistics(); + } + + @Override + boolean keepCurrentRow(boolean hasNotNullValues, long timestamp, Object[] rowValues) { + return satisfyRecordFilter(timestamp, rowValues); + } + + @Override + boolean allPageDataSatisfy() { + return !isModified && globalTimeFilterAllSatisfy() && pushDownFilterAllSatisfy(); + } + + @Override + void constructResult(boolean[] keepCurrentRow, long[] timeBatch, boolean pushDownFilterAllSatisfy) + throws IOException { + // construct time column + // when pushDownFilterAllSatisfy = true, we can skip rows by OFFSET & LIMIT + int readEndIndex = buildTimeColumn(timeBatch, keepCurrentRow, pushDownFilterAllSatisfy); + // construct value columns + buildValueColumns(readEndIndex, keepCurrentRow, timeBatch); + } + + private void buildValueColumns(int readEndIndex, boolean[] keepCurrentRow, long[] timeBatch) + throws IOException { + for (int i = 0; i < valueCount; i++) { + ValuePageReader pageReader = valuePageReaderList.get(i); + + if (pageReader != null) { + if (pageReader.isModified()) { + boolean[] isDeleted = new boolean[timeBatch.length]; + pageReader.fillIsDeleted(timeBatch, isDeleted, keepCurrentRow); + pageReader.writeColumnBuilderWithNextBatch( + readEndIndex, builder.getColumnBuilder(i), keepCurrentRow, isDeleted); + } else { + pageReader.writeColumnBuilderWithNextBatch( + readEndIndex, builder.getColumnBuilder(i), keepCurrentRow); + } + } else { + for (int j = 0; j < readEndIndex; j++) { + if (keepCurrentRow[j]) { + builder.getColumnBuilder(i).appendNull(); + } + } + } + } + } + + public void setDeleteIntervalList( + List<TimeRange> timeDeletions, List<List<TimeRange>> valueDeletionsList) { + timePageReader.setDeleteIntervalList(timeDeletions); + for (int i = 0; i < valueCount; i++) { + if (valuePageReaderList.get(i) != null) { + valuePageReaderList.get(i).setDeleteIntervalList(valueDeletionsList.get(i)); + } + } + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java index 58e9ff51..cf79e68a 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java @@ -603,6 +603,14 @@ public class ValuePageReader { } } + public void fillIsDeleted(long[] timestamp, boolean[] isDeleted, boolean[] keepCurrentRow) { + for (int i = 0, n = timestamp.length; i < n; i++) { + if (keepCurrentRow[i]) { + isDeleted[i] = isDeleted(timestamp[i]); + } + } + } + public TSDataType getDataType() { return dataType; }
