This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 07925267 Introducing Lazy-decoding of page data in PageReader
07925267 is described below
commit 079252672c910232d227d6040f4f3272e17684b3
Author: Liao Lanyu <[email protected]>
AuthorDate: Mon Jun 17 16:28:38 2024 +0800
Introducing Lazy-decoding of page data in PageReader
---
.../read/reader/chunk/AlignedChunkReader.java | 25 ++-
.../tsfile/read/reader/chunk/ChunkReader.java | 22 ++-
.../tsfile/read/reader/page/AlignedPageReader.java | 43 ++++-
.../tsfile/read/reader/page/LazyLoadPageData.java | 64 +++++++
.../apache/tsfile/read/reader/page/PageReader.java | 29 +++
.../tsfile/read/reader/page/ValuePageReader.java | 50 ++++-
.../read/reader/AlignedPageReaderPushDownTest.java | 201 +++++++++++++++++++++
7 files changed, 403 insertions(+), 31 deletions(-)
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java
index fe2543df..c967a298 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java
@@ -19,6 +19,7 @@
package org.apache.tsfile.read.reader.chunk;
+import org.apache.tsfile.compress.IUnCompressor;
import org.apache.tsfile.encoding.decoder.Decoder;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.MetaMarker;
@@ -29,6 +30,7 @@ import org.apache.tsfile.read.common.Chunk;
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.read.reader.page.AlignedPageReader;
+import org.apache.tsfile.read.reader.page.LazyLoadPageData;
import java.io.IOException;
import java.io.Serializable;
@@ -189,7 +191,7 @@ public class AlignedChunkReader extends AbstractChunkReader
{
ChunkReader.deserializePageData(timePageHeader, timeChunkDataBuffer,
timeChunkHeader);
List<PageHeader> valuePageHeaderList = new ArrayList<>();
- List<ByteBuffer> valuePageDataList = new ArrayList<>();
+ LazyLoadPageData[] lazyLoadPageDataArray = new
LazyLoadPageData[rawValuePageHeaderList.size()];
List<TSDataType> valueDataTypeList = new ArrayList<>();
List<Decoder> valueDecoderList = new ArrayList<>();
@@ -200,7 +202,7 @@ public class AlignedChunkReader extends AbstractChunkReader
{
if (valuePageHeader == null || valuePageHeader.getUncompressedSize() ==
0) {
// Empty Page
valuePageHeaderList.add(null);
- valuePageDataList.add(null);
+ lazyLoadPageDataArray[i] = null;
valueDataTypeList.add(null);
valueDecoderList.add(null);
} else if (pageDeleted(valuePageHeader,
valueDeleteIntervalsList.get(i))) {
@@ -209,15 +211,24 @@ public class AlignedChunkReader extends
AbstractChunkReader {
.position(
valueChunkDataBufferList.get(i).position() +
valuePageHeader.getCompressedSize());
valuePageHeaderList.add(null);
- valuePageDataList.add(null);
+ lazyLoadPageDataArray[i] = null;
valueDataTypeList.add(null);
valueDecoderList.add(null);
} else {
ChunkHeader valueChunkHeader = valueChunkHeaderList.get(i);
+ int currentPagePosition = valueChunkDataBufferList.get(i).position();
+ // adjust position as if we have read the page data even if it is just
lazy-loaded
+ valueChunkDataBufferList
+ .get(i)
+ .position(
+ valueChunkDataBufferList.get(i).position() +
valuePageHeader.getCompressedSize());
+
valuePageHeaderList.add(valuePageHeader);
- valuePageDataList.add(
- ChunkReader.deserializePageData(
- valuePageHeader, valueChunkDataBufferList.get(i),
valueChunkHeader));
+ lazyLoadPageDataArray[i] =
+ new LazyLoadPageData(
+ valueChunkDataBufferList.get(i).array(),
+ currentPagePosition,
+
IUnCompressor.getUnCompressor(valueChunkHeader.getCompressionType()));
valueDataTypeList.add(valueChunkHeader.getDataType());
valueDecoderList.add(
Decoder.getDecoderByType(
@@ -234,7 +245,7 @@ public class AlignedChunkReader extends AbstractChunkReader
{
timePageData,
defaultTimeDecoder,
valuePageHeaderList,
- valuePageDataList,
+ lazyLoadPageDataArray,
valueDataTypeList,
valueDecoderList,
queryFilter);
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java
index 4f724d21..a85683b1 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java
@@ -28,6 +28,7 @@ import org.apache.tsfile.file.metadata.statistics.Statistics;
import org.apache.tsfile.read.common.Chunk;
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.read.filter.basic.Filter;
+import org.apache.tsfile.read.reader.page.LazyLoadPageData;
import org.apache.tsfile.read.reader.page.PageReader;
import java.io.IOException;
@@ -42,7 +43,7 @@ public class ChunkReader extends AbstractChunkReader {
private final List<TimeRange> deleteIntervalList;
@SuppressWarnings("unchecked")
- public ChunkReader(Chunk chunk, long readStopTime, Filter queryFilter)
throws IOException {
+ public ChunkReader(Chunk chunk, long readStopTime, Filter queryFilter) {
super(readStopTime, queryFilter);
this.chunkHeader = chunk.getHeader();
this.chunkDataBuffer = chunk.getData();
@@ -55,22 +56,19 @@ public class ChunkReader extends AbstractChunkReader {
this(chunk, Long.MIN_VALUE, null);
}
- public ChunkReader(Chunk chunk, Filter queryFilter) throws IOException {
+ public ChunkReader(Chunk chunk, Filter queryFilter) {
this(chunk, Long.MIN_VALUE, queryFilter);
}
/**
* Constructor of ChunkReader by timestamp. This constructor is used to
accelerate queries by
* filtering out pages whose endTime is less than current timestamp.
- *
- * @throws IOException exception when initAllPageReaders
*/
- public ChunkReader(Chunk chunk, long readStopTime) throws IOException {
+ public ChunkReader(Chunk chunk, long readStopTime) {
this(chunk, readStopTime, null);
}
- private void initAllPageReaders(Statistics<? extends Serializable>
chunkStatistic)
- throws IOException {
+ private void initAllPageReaders(Statistics<? extends Serializable>
chunkStatistic) {
// construct next satisfied page header
while (chunkDataBuffer.remaining() > 0) {
// deserialize a PageHeader from chunkDataBuffer
@@ -126,12 +124,16 @@ public class ChunkReader extends AbstractChunkReader {
chunkDataBuffer.position(chunkDataBuffer.position() +
pageHeader.getCompressedSize());
}
- private PageReader constructPageReader(PageHeader pageHeader) throws
IOException {
- ByteBuffer pageData = deserializePageData(pageHeader, chunkDataBuffer,
chunkHeader);
+ private PageReader constructPageReader(PageHeader pageHeader) {
+ IUnCompressor unCompressor =
IUnCompressor.getUnCompressor(chunkHeader.getCompressionType());
+ // record the current position of chunkDataBuffer, use this to get the
page data in PageReader
+ // through directly accessing the buffer array
+ int currentPagePosition = chunkDataBuffer.position();
+ skipCurrentPage(pageHeader);
PageReader reader =
new PageReader(
pageHeader,
- pageData,
+ new LazyLoadPageData(chunkDataBuffer.array(), currentPagePosition,
unCompressor),
chunkHeader.getDataType(),
Decoder.getDecoderByType(chunkHeader.getEncodingType(),
chunkHeader.getDataType()),
defaultTimeDecoder,
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java
index b1ff14e3..75f664f5 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java
@@ -92,6 +92,40 @@ public class AlignedPageReader implements IPageReader {
this.valueCount = valuePageReaderList.size();
}
+ @SuppressWarnings("squid:S107")
+ public AlignedPageReader(
+ PageHeader timePageHeader,
+ ByteBuffer timePageData,
+ Decoder timeDecoder,
+ List<PageHeader> valuePageHeaderList,
+ // The reason for using Array here, rather than passing in
+ // List<LazyLoadPageData> as a parameter, is that after type erasure, it
would
+ // conflict with the existing constructor.
+ LazyLoadPageData[] lazyLoadPageDataArray,
+ List<TSDataType> valueDataTypeList,
+ List<Decoder> valueDecoderList,
+ Filter globalTimeFilter) {
+ timePageReader = new TimePageReader(timePageHeader, timePageData,
timeDecoder);
+ isModified = timePageReader.isModified();
+ valuePageReaderList = new ArrayList<>(valuePageHeaderList.size());
+ for (int i = 0; i < valuePageHeaderList.size(); i++) {
+ if (valuePageHeaderList.get(i) != null) {
+ ValuePageReader valuePageReader =
+ new ValuePageReader(
+ valuePageHeaderList.get(i),
+ lazyLoadPageDataArray[i],
+ valueDataTypeList.get(i),
+ valueDecoderList.get(i));
+ valuePageReaderList.add(valuePageReader);
+ isModified = isModified || valuePageReader.isModified();
+ } else {
+ valuePageReaderList.add(null);
+ }
+ }
+ this.globalTimeFilter = globalTimeFilter;
+ this.valueCount = valuePageReaderList.size();
+ }
+
@Override
public BatchData getAllSatisfiedPageData(boolean ascending) throws
IOException {
BatchData pageData = BatchDataFactory.createBatchData(TSDataType.VECTOR,
ascending, false);
@@ -216,7 +250,7 @@ public class AlignedPageReader implements IPageReader {
unFilteredBlock, builder, pushDownFilter, paginationController);
}
- private void buildResultWithoutAnyFilterAndDelete(long[] timeBatch) {
+ private void buildResultWithoutAnyFilterAndDelete(long[] timeBatch) throws
IOException {
if (paginationController.hasCurOffset(timeBatch.length)) {
paginationController.consumeOffset(timeBatch.length);
} else {
@@ -295,8 +329,8 @@ public class AlignedPageReader implements IPageReader {
return readEndIndex + 1;
}
- private void buildValueColumns(
- int readEndIndex, boolean[] keepCurrentRow, boolean[][] isDeleted) {
+ private void buildValueColumns(int readEndIndex, boolean[] keepCurrentRow,
boolean[][] isDeleted)
+ throws IOException {
for (int i = 0; i < valueCount; i++) {
ValuePageReader pageReader = valuePageReaderList.get(i);
if (pageReader != null) {
@@ -320,7 +354,8 @@ public class AlignedPageReader implements IPageReader {
}
}
- private void fillIsDeletedAndBitMask(long[] timeBatch, boolean[][]
isDeleted, byte[] bitmask) {
+ private void fillIsDeletedAndBitMask(long[] timeBatch, boolean[][]
isDeleted, byte[] bitmask)
+ throws IOException {
for (int columnIndex = 0; columnIndex < valueCount; columnIndex++) {
ValuePageReader pageReader = valuePageReaderList.get(columnIndex);
if (pageReader != null) {
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/LazyLoadPageData.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/LazyLoadPageData.java
new file mode 100644
index 00000000..d89cc2d8
--- /dev/null
+++
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/LazyLoadPageData.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.read.reader.page;
+
+import org.apache.tsfile.compress.IUnCompressor;
+import org.apache.tsfile.file.header.PageHeader;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class LazyLoadPageData {
+ /** Reference to the data of original chunkDataBuffer. * */
+ private final byte[] chunkData;
+
+ private final int pageDataOffset;
+
+ private final IUnCompressor unCompressor;
+
+ public LazyLoadPageData(byte[] data, int offset, IUnCompressor unCompressor)
{
+ this.chunkData = data;
+ this.pageDataOffset = offset;
+ this.unCompressor = unCompressor;
+ }
+
+ public ByteBuffer uncompressPageData(PageHeader pageHeader) throws
IOException {
+ int compressedPageBodyLength = pageHeader.getCompressedSize();
+ byte[] uncompressedPageData = new byte[pageHeader.getUncompressedSize()];
+ try {
+ unCompressor.uncompress(
+ chunkData, pageDataOffset, compressedPageBodyLength,
uncompressedPageData, 0);
+ } catch (Exception e) {
+ throw new IOException(
+ "Uncompress error! uncompress size: "
+ + pageHeader.getUncompressedSize()
+ + "compressed size: "
+ + pageHeader.getCompressedSize()
+ + "page header: "
+ + pageHeader
+ + e.getMessage());
+ }
+ return ByteBuffer.wrap(uncompressedPageData);
+ }
+
+ public IUnCompressor getUnCompressor() {
+ return unCompressor;
+ }
+}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/PageReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/PageReader.java
index bbf0b49c..3bb57a26 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/PageReader.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/PageReader.java
@@ -74,6 +74,10 @@ public class PageReader implements IPageReader {
private int deleteCursor = 0;
+ // used for lazy decoding
+
+ private LazyLoadPageData lazyLoadPageData;
+
public PageReader(
ByteBuffer pageData, TSDataType dataType, Decoder valueDecoder, Decoder
timeDecoder) {
this(null, pageData, dataType, valueDecoder, timeDecoder, null);
@@ -103,6 +107,21 @@ public class PageReader implements IPageReader {
splitDataToTimeStampAndValue(pageData);
}
+ public PageReader(
+ PageHeader pageHeader,
+ LazyLoadPageData lazyLoadPageData,
+ TSDataType dataType,
+ Decoder valueDecoder,
+ Decoder timeDecoder,
+ Filter recordFilter) {
+ this.dataType = dataType;
+ this.valueDecoder = valueDecoder;
+ this.timeDecoder = timeDecoder;
+ this.recordFilter = recordFilter;
+ this.pageHeader = pageHeader;
+ this.lazyLoadPageData = lazyLoadPageData;
+ }
+
/**
* split pageContent into two stream: time and value
*
@@ -118,10 +137,19 @@ public class PageReader implements IPageReader {
valueBuffer.position(timeBufferLength);
}
+ /** Call this method before accessing data. */
+ private void uncompressDataIfNecessary() throws IOException {
+ if (lazyLoadPageData != null && (timeBuffer == null || valueBuffer ==
null)) {
+
splitDataToTimeStampAndValue(lazyLoadPageData.uncompressPageData(pageHeader));
+ lazyLoadPageData = null;
+ }
+ }
+
/** @return the returned BatchData may be empty, but never be null */
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
@Override
public BatchData getAllSatisfiedPageData(boolean ascending) throws
IOException {
+ uncompressDataIfNecessary();
BatchData pageData = BatchDataFactory.createBatchData(dataType, ascending,
false);
boolean allSatisfy = recordFilter == null || recordFilter.allSatisfy(this);
while (timeDecoder.hasNext(timeBuffer)) {
@@ -176,6 +204,7 @@ public class PageReader implements IPageReader {
@Override
public TsBlock getAllSatisfiedData() throws IOException {
+ uncompressDataIfNecessary();
TsBlockBuilder builder;
int initialExpectedEntries = (int) pageHeader.getStatistics().getCount();
if (paginationController.hasLimit()) {
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java
index 8bf13791..7f708452 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java
@@ -33,6 +33,7 @@ import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.utils.TsPrimitiveType;
import org.apache.tsfile.write.UnSupportedDataTypeException;
+import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -61,6 +62,8 @@ public class ValuePageReader {
private int deleteCursor = 0;
+ private LazyLoadPageData lazyLoadPageData;
+
public ValuePageReader(
PageHeader pageHeader, ByteBuffer pageData, TSDataType dataType, Decoder
valueDecoder) {
this.dataType = dataType;
@@ -72,6 +75,17 @@ public class ValuePageReader {
this.valueBuffer = pageData;
}
+ public ValuePageReader(
+ PageHeader pageHeader,
+ LazyLoadPageData lazyLoadPageData,
+ TSDataType dataType,
+ Decoder valueDecoder) {
+ this.dataType = dataType;
+ this.valueDecoder = valueDecoder;
+ this.pageHeader = pageHeader;
+ this.lazyLoadPageData = lazyLoadPageData;
+ }
+
private void splitDataToBitmapAndValue(ByteBuffer pageData) {
if (!pageData.hasRemaining()) { // Empty Page
return;
@@ -82,11 +96,23 @@ public class ValuePageReader {
this.valueBuffer = pageData.slice();
}
+ /** Call this method before accessing data. */
+ private void uncompressDataIfNecessary() throws IOException {
+ if (lazyLoadPageData != null && valueBuffer == null) {
+ ByteBuffer pageData = lazyLoadPageData.uncompressPageData(pageHeader);
+ splitDataToBitmapAndValue(pageData);
+ this.valueBuffer = pageData;
+ lazyLoadPageData = null;
+ }
+ }
+
/**
* return a BatchData with the corresponding timeBatch, the BatchData's
dataType is same as this
* sub sensor
*/
- public BatchData nextBatch(long[] timeBatch, boolean ascending, Filter
filter) {
+ public BatchData nextBatch(long[] timeBatch, boolean ascending, Filter
filter)
+ throws IOException {
+ uncompressDataIfNecessary();
BatchData pageData = BatchDataFactory.createBatchData(dataType, ascending,
false);
for (int i = 0; i < timeBatch.length; i++) {
if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) {
@@ -141,7 +167,8 @@ public class ValuePageReader {
return pageData.flip();
}
- public TsPrimitiveType nextValue(long timestamp, int timeIndex) {
+ public TsPrimitiveType nextValue(long timestamp, int timeIndex) throws
IOException {
+ uncompressDataIfNecessary();
TsPrimitiveType resultValue = null;
if (valueBuffer == null || ((bitmap[timeIndex / 8] & 0xFF) & (MASK >>>
(timeIndex % 8))) == 0) {
return null;
@@ -198,7 +225,8 @@ public class ValuePageReader {
* return the value array of the corresponding time, if this sub sensor
don't have a value in a
* time, just fill it with null
*/
- public TsPrimitiveType[] nextValueBatch(long[] timeBatch) {
+ public TsPrimitiveType[] nextValueBatch(long[] timeBatch) throws IOException
{
+ uncompressDataIfNecessary();
TsPrimitiveType[] valueBatch = new TsPrimitiveType[size];
if (valueBuffer == null) {
return valueBatch;
@@ -256,10 +284,9 @@ public class ValuePageReader {
}
public void writeColumnBuilderWithNextBatch(
- int readEndIndex,
- ColumnBuilder columnBuilder,
- boolean[] keepCurrentRow,
- boolean[] isDeleted) {
+ int readEndIndex, ColumnBuilder columnBuilder, boolean[] keepCurrentRow,
boolean[] isDeleted)
+ throws IOException {
+ uncompressDataIfNecessary();
if (valueBuffer == null) {
for (int i = 0; i < readEndIndex; i++) {
if (keepCurrentRow[i]) {
@@ -347,7 +374,8 @@ public class ValuePageReader {
}
public void writeColumnBuilderWithNextBatch(
- int readEndIndex, ColumnBuilder columnBuilder, boolean[] keepCurrentRow)
{
+ int readEndIndex, ColumnBuilder columnBuilder, boolean[] keepCurrentRow)
throws IOException {
+ uncompressDataIfNecessary();
if (valueBuffer == null) {
for (int i = 0; i < readEndIndex; i++) {
if (keepCurrentRow[i]) {
@@ -411,7 +439,8 @@ public class ValuePageReader {
}
public void writeColumnBuilderWithNextBatch(
- int readStartIndex, int readEndIndex, ColumnBuilder columnBuilder) {
+ int readStartIndex, int readEndIndex, ColumnBuilder columnBuilder)
throws IOException {
+ uncompressDataIfNecessary();
if (valueBuffer == null) {
columnBuilder.appendNull(readEndIndex - readStartIndex);
return;
@@ -574,7 +603,8 @@ public class ValuePageReader {
return dataType;
}
- public byte[] getBitmap() {
+ public byte[] getBitmap() throws IOException {
+ uncompressDataIfNecessary();
return Arrays.copyOf(bitmap, bitmap.length);
}
}
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/AlignedPageReaderPushDownTest.java
b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/AlignedPageReaderPushDownTest.java
index b28a03d6..47dc4024 100644
---
a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/AlignedPageReaderPushDownTest.java
+++
b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/AlignedPageReaderPushDownTest.java
@@ -20,6 +20,7 @@
package org.apache.tsfile.read.reader;
import org.apache.tsfile.compress.ICompressor;
+import org.apache.tsfile.compress.IUnCompressor;
import org.apache.tsfile.encoding.decoder.Decoder;
import org.apache.tsfile.encoding.decoder.DeltaBinaryDecoder;
import org.apache.tsfile.encoding.decoder.IntRleDecoder;
@@ -34,6 +35,7 @@ import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.read.filter.factory.TimeFilterApi;
import org.apache.tsfile.read.filter.factory.ValueFilterApi;
import org.apache.tsfile.read.reader.page.AlignedPageReader;
+import org.apache.tsfile.read.reader.page.LazyLoadPageData;
import org.apache.tsfile.read.reader.series.PaginationController;
import org.apache.tsfile.write.page.TimePageWriter;
import org.apache.tsfile.write.page.ValuePageWriter;
@@ -142,10 +144,53 @@ public class AlignedPageReaderPushDownTest {
return alignedPageReader;
}
+ private AlignedPageReader generateAlignedPageReaderUsingLazyLoad(
+ Filter globalTimeFilter, List<Boolean> modified) throws IOException {
+ resetDataBuffer();
+ testValuePageHeader1.setModified(modified.get(0));
+ testValuePageHeader2.setModified(modified.get(1));
+ testValuePageHeader1.setCompressedSize(testValuePageData1.array().length);
+
testValuePageHeader1.setUncompressedSize(testValuePageData1.array().length);
+ testValuePageHeader2.setCompressedSize(testValuePageData2.array().length);
+
testValuePageHeader2.setUncompressedSize(testValuePageData2.array().length);
+ List<PageHeader> valuePageHeaderList =
+ Arrays.asList(testValuePageHeader1, testValuePageHeader2);
+ LazyLoadPageData[] lazyLoadPageDataArray = new LazyLoadPageData[2];
+ lazyLoadPageDataArray[0] =
+ new LazyLoadPageData(
+ testValuePageData1.array(),
+ 0,
+ IUnCompressor.getUnCompressor(CompressionType.UNCOMPRESSED));
+ lazyLoadPageDataArray[1] =
+ new LazyLoadPageData(
+ testValuePageData2.array(),
+ 0,
+ IUnCompressor.getUnCompressor(CompressionType.UNCOMPRESSED));
+ List<TSDataType> valueDataTypeList = Arrays.asList(TSDataType.INT32,
TSDataType.INT32);
+ List<Decoder> valueDecoderList = Arrays.asList(new IntRleDecoder(), new
IntRleDecoder());
+ AlignedPageReader alignedPageReader =
+ new AlignedPageReader(
+ testTimePageHeader,
+ testTimePageData,
+ new DeltaBinaryDecoder.LongDeltaDecoder(),
+ valuePageHeaderList,
+ lazyLoadPageDataArray,
+ valueDataTypeList,
+ valueDecoderList,
+ globalTimeFilter);
+ alignedPageReader.initTsBlockBuilder(valueDataTypeList);
+ return alignedPageReader;
+ }
+
private AlignedPageReader generateAlignedPageReader(Filter globalTimeFilter)
throws IOException {
return generateAlignedPageReader(globalTimeFilter, Arrays.asList(false,
false));
}
+ private AlignedPageReader generateAlignedPageReaderUsingLazyLoad(Filter
globalTimeFilter)
+ throws IOException {
+ return generateAlignedPageReaderUsingLazyLoad(globalTimeFilter,
Arrays.asList(false, false));
+ }
+
private AlignedPageReader generateSingleColumnAlignedPageReader(
Filter globalTimeFilter, boolean modified) {
resetDataBuffer();
@@ -168,10 +213,44 @@ public class AlignedPageReaderPushDownTest {
return alignedPageReader;
}
+ private AlignedPageReader generateSingleColumnAlignedPageReaderUsingLazyLoad(
+ Filter globalTimeFilter, boolean modified) {
+ resetDataBuffer();
+ testValuePageHeader2.setModified(modified);
+ testValuePageHeader2.setCompressedSize(testValuePageData2.array().length);
+
testValuePageHeader2.setUncompressedSize(testValuePageData2.array().length);
+ List<PageHeader> valuePageHeaderList =
Collections.singletonList(testValuePageHeader2);
+ LazyLoadPageData[] lazyLoadPageDataArray = new LazyLoadPageData[1];
+ lazyLoadPageDataArray[0] =
+ new LazyLoadPageData(
+ testValuePageData2.array(),
+ 0,
+ IUnCompressor.getUnCompressor(CompressionType.UNCOMPRESSED));
+ List<TSDataType> valueDataTypeList =
Collections.singletonList(TSDataType.INT32);
+ List<Decoder> valueDecoderList = Collections.singletonList(new
IntRleDecoder());
+ AlignedPageReader alignedPageReader =
+ new AlignedPageReader(
+ testTimePageHeader,
+ testTimePageData,
+ new DeltaBinaryDecoder.LongDeltaDecoder(),
+ valuePageHeaderList,
+ lazyLoadPageDataArray,
+ valueDataTypeList,
+ valueDecoderList,
+ globalTimeFilter);
+ alignedPageReader.initTsBlockBuilder(valueDataTypeList);
+ return alignedPageReader;
+ }
+
private AlignedPageReader generateSingleColumnAlignedPageReader(Filter
globalTimeFilter) {
return generateSingleColumnAlignedPageReader(globalTimeFilter, false);
}
+ private AlignedPageReader generateSingleColumnAlignedPageReaderUsingLazyLoad(
+ Filter globalTimeFilter) {
+ return
generateSingleColumnAlignedPageReaderUsingLazyLoad(globalTimeFilter, false);
+ }
+
@Test
public void testNullFilter() throws IOException {
AlignedPageReader alignedPageReader1 = generateAlignedPageReader(null);
@@ -181,6 +260,14 @@ public class AlignedPageReaderPushDownTest {
AlignedPageReader alignedPageReader2 =
generateSingleColumnAlignedPageReader(null);
TsBlock tsBlock2 = alignedPageReader2.getAllSatisfiedData();
Assert.assertEquals(80, tsBlock2.getPositionCount());
+
+ AlignedPageReader alignedPageReader3 =
generateAlignedPageReaderUsingLazyLoad(null);
+ TsBlock tsBlock3 = alignedPageReader3.getAllSatisfiedData();
+ Assert.assertEquals(100, tsBlock3.getPositionCount());
+
+ AlignedPageReader alignedPageReader4 =
generateSingleColumnAlignedPageReaderUsingLazyLoad(null);
+ TsBlock tsBlock4 = alignedPageReader4.getAllSatisfiedData();
+ Assert.assertEquals(80, tsBlock4.getPositionCount());
}
@Test
@@ -199,6 +286,22 @@ public class AlignedPageReaderPushDownTest {
Collections.singletonList(Collections.singletonList(new TimeRange(30,
39))));
TsBlock tsBlock2 = alignedPageReader2.getAllSatisfiedData();
Assert.assertEquals(70, tsBlock2.getPositionCount());
+
+ AlignedPageReader alignedPageReader3 =
+ generateAlignedPageReaderUsingLazyLoad(null, Arrays.asList(true,
true));
+ alignedPageReader3.setDeleteIntervalList(
+ Arrays.asList(
+ Arrays.asList(new TimeRange(0, 9), new TimeRange(20, 29)),
+ Collections.singletonList(new TimeRange(30, 39))));
+ TsBlock tsBlock3 = alignedPageReader3.getAllSatisfiedData();
+ Assert.assertEquals(90, tsBlock3.getPositionCount());
+
+ AlignedPageReader alignedPageReader4 =
+ generateSingleColumnAlignedPageReaderUsingLazyLoad(null, true);
+ alignedPageReader4.setDeleteIntervalList(
+ Collections.singletonList(Collections.singletonList(new TimeRange(30,
39))));
+ TsBlock tsBlock4 = alignedPageReader4.getAllSatisfiedData();
+ Assert.assertEquals(70, tsBlock4.getPositionCount());
}
@Test
@@ -216,6 +319,20 @@ public class AlignedPageReaderPushDownTest {
Assert.assertEquals(10, tsBlock2.getPositionCount());
Assert.assertEquals(20, tsBlock2.getTimeByIndex(0));
Assert.assertEquals(29, tsBlock2.getTimeByIndex(9));
+
+ AlignedPageReader alignedPageReader3 =
generateAlignedPageReaderUsingLazyLoad(null);
+ alignedPageReader3.setLimitOffset(new PaginationController(10, 10));
+ TsBlock tsBlock3 = alignedPageReader3.getAllSatisfiedData();
+ Assert.assertEquals(10, tsBlock3.getPositionCount());
+ Assert.assertEquals(10, tsBlock3.getTimeByIndex(0));
+ Assert.assertEquals(19, tsBlock3.getTimeByIndex(9));
+
+ AlignedPageReader alignedPageReader4 =
generateSingleColumnAlignedPageReaderUsingLazyLoad(null);
+ alignedPageReader4.setLimitOffset(new PaginationController(10, 10));
+ TsBlock tsBlock4 = alignedPageReader4.getAllSatisfiedData();
+ Assert.assertEquals(10, tsBlock4.getPositionCount());
+ Assert.assertEquals(20, tsBlock4.getTimeByIndex(0));
+ Assert.assertEquals(29, tsBlock4.getTimeByIndex(9));
}
@Test
@@ -230,6 +347,17 @@ public class AlignedPageReaderPushDownTest {
alignedPageReader2.addRecordFilter(ValueFilterApi.gtEq(50));
TsBlock tsBlock2 = alignedPageReader2.getAllSatisfiedData();
Assert.assertEquals(40, tsBlock2.getPositionCount());
+
+ AlignedPageReader alignedPageReader3 =
generateAlignedPageReaderUsingLazyLoad(globalTimeFilter);
+ alignedPageReader3.addRecordFilter(ValueFilterApi.gtEq(50));
+ TsBlock tsBlock3 = alignedPageReader3.getAllSatisfiedData();
+ Assert.assertEquals(50, tsBlock3.getPositionCount());
+
+ AlignedPageReader alignedPageReader4 =
+ generateSingleColumnAlignedPageReaderUsingLazyLoad(globalTimeFilter);
+ alignedPageReader4.addRecordFilter(ValueFilterApi.gtEq(50));
+ TsBlock tsBlock4 = alignedPageReader4.getAllSatisfiedData();
+ Assert.assertEquals(40, tsBlock4.getPositionCount());
}
@Test
@@ -250,6 +378,23 @@ public class AlignedPageReaderPushDownTest {
Assert.assertEquals(10, tsBlock2.getPositionCount());
Assert.assertEquals(60, tsBlock2.getTimeByIndex(0));
Assert.assertEquals(69, tsBlock2.getTimeByIndex(9));
+
+ AlignedPageReader alignedPageReader3 =
generateAlignedPageReaderUsingLazyLoad(globalTimeFilter);
+ alignedPageReader3.addRecordFilter(ValueFilterApi.gtEq(50));
+ alignedPageReader3.setLimitOffset(new PaginationController(10, 10));
+ TsBlock tsBlock3 = alignedPageReader3.getAllSatisfiedData();
+ Assert.assertEquals(10, tsBlock3.getPositionCount());
+ Assert.assertEquals(60, tsBlock3.getTimeByIndex(0));
+ Assert.assertEquals(69, tsBlock3.getTimeByIndex(9));
+
+ AlignedPageReader alignedPageReader4 =
+ generateSingleColumnAlignedPageReaderUsingLazyLoad(globalTimeFilter);
+ alignedPageReader4.addRecordFilter(ValueFilterApi.gtEq(50));
+ alignedPageReader4.setLimitOffset(new PaginationController(10, 10));
+ TsBlock tsBlock4 = alignedPageReader4.getAllSatisfiedData();
+ Assert.assertEquals(10, tsBlock4.getPositionCount());
+ Assert.assertEquals(60, tsBlock4.getTimeByIndex(0));
+ Assert.assertEquals(69, tsBlock4.getTimeByIndex(9));
}
@Test
@@ -264,6 +409,17 @@ public class AlignedPageReaderPushDownTest {
alignedPageReader2.addRecordFilter(ValueFilterApi.gtEq(0));
TsBlock tsBlock2 = alignedPageReader2.getAllSatisfiedData();
Assert.assertEquals(40, tsBlock2.getPositionCount());
+
+ AlignedPageReader alignedPageReader3 =
generateAlignedPageReaderUsingLazyLoad(globalTimeFilter);
+ alignedPageReader3.addRecordFilter(ValueFilterApi.gtEq(0));
+ TsBlock tsBlock3 = alignedPageReader3.getAllSatisfiedData();
+ Assert.assertEquals(50, tsBlock3.getPositionCount());
+
+ AlignedPageReader alignedPageReader4 =
+ generateSingleColumnAlignedPageReaderUsingLazyLoad(globalTimeFilter);
+ alignedPageReader4.addRecordFilter(ValueFilterApi.gtEq(0));
+ TsBlock tsBlock4 = alignedPageReader4.getAllSatisfiedData();
+ Assert.assertEquals(40, tsBlock4.getPositionCount());
}
@Test
@@ -284,6 +440,23 @@ public class AlignedPageReaderPushDownTest {
Assert.assertEquals(10, tsBlock2.getPositionCount());
Assert.assertEquals(60, tsBlock2.getTimeByIndex(0));
Assert.assertEquals(69, tsBlock2.getTimeByIndex(9));
+
+ AlignedPageReader alignedPageReader3 =
generateAlignedPageReaderUsingLazyLoad(globalTimeFilter);
+ alignedPageReader3.addRecordFilter(ValueFilterApi.gtEq(0));
+ alignedPageReader3.setLimitOffset(new PaginationController(10, 10));
+ TsBlock tsBlock3 = alignedPageReader3.getAllSatisfiedData();
+ Assert.assertEquals(10, tsBlock3.getPositionCount());
+ Assert.assertEquals(60, tsBlock3.getTimeByIndex(0));
+ Assert.assertEquals(69, tsBlock3.getTimeByIndex(9));
+
+ AlignedPageReader alignedPageReader4 =
+ generateSingleColumnAlignedPageReaderUsingLazyLoad(globalTimeFilter);
+ alignedPageReader4.addRecordFilter(ValueFilterApi.gtEq(0));
+ alignedPageReader4.setLimitOffset(new PaginationController(10, 10));
+ TsBlock tsBlock4 = alignedPageReader4.getAllSatisfiedData();
+ Assert.assertEquals(10, tsBlock4.getPositionCount());
+ Assert.assertEquals(60, tsBlock4.getTimeByIndex(0));
+ Assert.assertEquals(69, tsBlock4.getTimeByIndex(9));
}
@Test
@@ -298,6 +471,17 @@ public class AlignedPageReaderPushDownTest {
alignedPageReader2.addRecordFilter(ValueFilterApi.lt(80));
TsBlock tsBlock2 = alignedPageReader2.getAllSatisfiedData();
Assert.assertEquals(50, tsBlock2.getPositionCount());
+
+ AlignedPageReader alignedPageReader3 =
generateAlignedPageReaderUsingLazyLoad(globalTimeFilter);
+ alignedPageReader3.addRecordFilter(ValueFilterApi.lt(80));
+ TsBlock tsBlock3 = alignedPageReader3.getAllSatisfiedData();
+ Assert.assertEquals(50, tsBlock3.getPositionCount());
+
+ AlignedPageReader alignedPageReader4 =
+ generateSingleColumnAlignedPageReaderUsingLazyLoad(globalTimeFilter);
+ alignedPageReader4.addRecordFilter(ValueFilterApi.lt(80));
+ TsBlock tsBlock4 = alignedPageReader4.getAllSatisfiedData();
+ Assert.assertEquals(50, tsBlock4.getPositionCount());
}
@Test
@@ -318,5 +502,22 @@ public class AlignedPageReaderPushDownTest {
Assert.assertEquals(10, tsBlock2.getPositionCount());
Assert.assertEquals(60, tsBlock2.getTimeByIndex(0));
Assert.assertEquals(69, tsBlock2.getTimeByIndex(9));
+
+ AlignedPageReader alignedPageReader3 =
generateAlignedPageReaderUsingLazyLoad(globalTimeFilter);
+ alignedPageReader3.addRecordFilter(ValueFilterApi.lt(80));
+ alignedPageReader3.setLimitOffset(new PaginationController(10, 10));
+ TsBlock tsBlock3 = alignedPageReader3.getAllSatisfiedData();
+ Assert.assertEquals(10, tsBlock3.getPositionCount());
+ Assert.assertEquals(60, tsBlock3.getTimeByIndex(0));
+ Assert.assertEquals(69, tsBlock3.getTimeByIndex(9));
+
+ AlignedPageReader alignedPageReader4 =
+ generateSingleColumnAlignedPageReaderUsingLazyLoad(globalTimeFilter);
+ alignedPageReader4.addRecordFilter(ValueFilterApi.lt(80));
+ alignedPageReader4.setLimitOffset(new PaginationController(10, 10));
+ TsBlock tsBlock4 = alignedPageReader4.getAllSatisfiedData();
+ Assert.assertEquals(10, tsBlock4.getPositionCount());
+ Assert.assertEquals(60, tsBlock4.getTimeByIndex(0));
+ Assert.assertEquals(69, tsBlock4.getTimeByIndex(9));
}
}