This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new fae7467e Add TableChunkReader and TablePageReader to support keep all
null rows while scanning
fae7467e is described below
commit fae7467e7e9fe9b070f37364fa4b03b7a12a0929
Author: Jackie Tien <[email protected]>
AuthorDate: Thu Sep 12 11:51:33 2024 +0800
Add TableChunkReader and TablePageReader to support keep all null rows
while scanning
---
...java => AbstractAlignedTimeSeriesMetadata.java} | 43 +--
.../file/metadata/AlignedTimeSeriesMetadata.java | 168 +----------
.../tsfile/file/metadata/TableDeviceMetadata.java | 50 ++++
.../apache/tsfile/read/common/block/TsBlock.java | 12 -
...Reader.java => AbstractAlignedChunkReader.java} | 80 +++---
.../read/reader/chunk/AlignedChunkReader.java | 228 ++-------------
.../tsfile/read/reader/chunk/TableChunkReader.java | 93 +++++++
...eReader.java => AbstractAlignedPageReader.java} | 192 +++----------
.../tsfile/read/reader/page/AlignedPageReader.java | 309 ++-------------------
.../tsfile/read/reader/page/TablePageReader.java | 137 +++++++++
.../tsfile/read/reader/page/ValuePageReader.java | 8 +
11 files changed, 438 insertions(+), 882 deletions(-)
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AlignedTimeSeriesMetadata.java
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AbstractAlignedTimeSeriesMetadata.java
similarity index 84%
copy from
java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AlignedTimeSeriesMetadata.java
copy to
java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AbstractAlignedTimeSeriesMetadata.java
index 583f08ea..4e812a89 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AlignedTimeSeriesMetadata.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AbstractAlignedTimeSeriesMetadata.java
@@ -29,32 +29,21 @@ import java.util.Collections;
import java.util.List;
import java.util.Optional;
-public class AlignedTimeSeriesMetadata implements ITimeSeriesMetadata {
+public abstract class AbstractAlignedTimeSeriesMetadata implements
ITimeSeriesMetadata {
// TimeSeriesMetadata for time column
- private final TimeseriesMetadata timeseriesMetadata;
+ protected final TimeseriesMetadata timeseriesMetadata;
// TimeSeriesMetadata for all subSensors in the vector
- private final List<TimeseriesMetadata> valueTimeseriesMetadataList;
+ protected final List<TimeseriesMetadata> valueTimeseriesMetadataList;
- private IChunkMetadataLoader chunkMetadataLoader;
+ protected IChunkMetadataLoader chunkMetadataLoader;
- public AlignedTimeSeriesMetadata(
+ AbstractAlignedTimeSeriesMetadata(
TimeseriesMetadata timeseriesMetadata, List<TimeseriesMetadata>
valueTimeseriesMetadataList) {
this.timeseriesMetadata = timeseriesMetadata;
this.valueTimeseriesMetadataList = valueTimeseriesMetadataList;
}
- /**
- * If the vector contains only one sub sensor, just return the sub sensor's
Statistics Otherwise,
- * return the Statistics of the time column.
- */
- @Override
- public Statistics getStatistics() {
- return valueTimeseriesMetadataList.size() == 1 &&
valueTimeseriesMetadataList.get(0) != null
- ? valueTimeseriesMetadataList.get(0).getStatistics()
- : timeseriesMetadata.getStatistics();
- }
-
@Override
public Statistics<? extends Serializable> getTimeStatistics() {
return timeseriesMetadata.getStatistics();
@@ -80,18 +69,6 @@ public class AlignedTimeSeriesMetadata implements
ITimeSeriesMetadata {
return valueTimeseriesMetadataList.size();
}
- @Override
- public boolean timeAllSelected() {
- for (int index = 0; index < getMeasurementCount(); index++) {
- if (!hasNullValue(index)) {
- // When there is any value page point number that is the same as the
time page,
- // it means that all timestamps in time page will be selected.
- return true;
- }
- }
- return false;
- }
-
@Override
public boolean isModified() {
return timeseriesMetadata.isModified();
@@ -176,14 +153,18 @@ public class AlignedTimeSeriesMetadata implements
ITimeSeriesMetadata {
exits = (exits || v != null);
chunkMetadataList.add(v);
}
- if (exits) {
- res.add(new AlignedChunkMetadata(timeChunkMetadata.get(i),
chunkMetadataList));
- }
+ constructAlignedChunkMetadata(res, timeChunkMetadata.get(i),
chunkMetadataList, exits);
}
}
return res;
}
+ abstract void constructAlignedChunkMetadata(
+ List<AlignedChunkMetadata> res,
+ IChunkMetadata timeChunkMetadata,
+ List<IChunkMetadata> chunkMetadataList,
+ boolean exits);
+
@Override
public void setChunkMetadataLoader(IChunkMetadataLoader chunkMetadataLoader)
{
this.chunkMetadataLoader = chunkMetadataLoader;
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AlignedTimeSeriesMetadata.java
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AlignedTimeSeriesMetadata.java
index 583f08ea..ba5cbce4 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AlignedTimeSeriesMetadata.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AlignedTimeSeriesMetadata.java
@@ -19,29 +19,15 @@
package org.apache.tsfile.file.metadata;
-import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.statistics.Statistics;
-import org.apache.tsfile.read.controller.IChunkMetadataLoader;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
-import java.util.Optional;
-public class AlignedTimeSeriesMetadata implements ITimeSeriesMetadata {
-
- // TimeSeriesMetadata for time column
- private final TimeseriesMetadata timeseriesMetadata;
- // TimeSeriesMetadata for all subSensors in the vector
- private final List<TimeseriesMetadata> valueTimeseriesMetadataList;
-
- private IChunkMetadataLoader chunkMetadataLoader;
+public class AlignedTimeSeriesMetadata extends
AbstractAlignedTimeSeriesMetadata {
public AlignedTimeSeriesMetadata(
TimeseriesMetadata timeseriesMetadata, List<TimeseriesMetadata>
valueTimeseriesMetadataList) {
- this.timeseriesMetadata = timeseriesMetadata;
- this.valueTimeseriesMetadataList = valueTimeseriesMetadataList;
+ super(timeseriesMetadata, valueTimeseriesMetadataList);
}
/**
@@ -55,31 +41,6 @@ public class AlignedTimeSeriesMetadata implements
ITimeSeriesMetadata {
: timeseriesMetadata.getStatistics();
}
- @Override
- public Statistics<? extends Serializable> getTimeStatistics() {
- return timeseriesMetadata.getStatistics();
- }
-
- @Override
- public Optional<Statistics<? extends Serializable>> getMeasurementStatistics(
- int measurementIndex) {
- TimeseriesMetadata metadata =
valueTimeseriesMetadataList.get(measurementIndex);
- return Optional.ofNullable(metadata == null ? null :
metadata.getStatistics());
- }
-
- @Override
- public boolean hasNullValue(int measurementIndex) {
- long rowCount = getTimeStatistics().getCount();
- Optional<Statistics<? extends Serializable>> statistics =
- getMeasurementStatistics(measurementIndex);
- return statistics.map(stat -> stat.hasNullValue(rowCount)).orElse(true);
- }
-
- @Override
- public int getMeasurementCount() {
- return valueTimeseriesMetadataList.size();
- }
-
@Override
public boolean timeAllSelected() {
for (int index = 0; index < getMeasurementCount(); index++) {
@@ -93,124 +54,13 @@ public class AlignedTimeSeriesMetadata implements
ITimeSeriesMetadata {
}
@Override
- public boolean isModified() {
- return timeseriesMetadata.isModified();
- }
-
- @Override
- public void setModified(boolean modified) {
- timeseriesMetadata.setModified(modified);
- for (TimeseriesMetadata subSensor : valueTimeseriesMetadataList) {
- if (subSensor != null) {
- subSensor.setModified(modified);
- }
- }
- }
-
- @Override
- public boolean isSeq() {
- return timeseriesMetadata.isSeq();
- }
-
- @Override
- public void setSeq(boolean seq) {
- timeseriesMetadata.setSeq(seq);
- for (TimeseriesMetadata subSensor : valueTimeseriesMetadataList) {
- if (subSensor != null) {
- subSensor.setSeq(seq);
- }
+ void constructAlignedChunkMetadata(
+ List<AlignedChunkMetadata> res,
+ IChunkMetadata timeChunkMetadata,
+ List<IChunkMetadata> chunkMetadataList,
+ boolean exits) {
+ if (exits) {
+ res.add(new AlignedChunkMetadata(timeChunkMetadata, chunkMetadataList));
}
}
-
- /**
- * If the chunkMetadataLoader is MemChunkMetadataLoader, the
VectorChunkMetadata is already
- * assembled while constructing the in-memory TsFileResource, so we just
return the assembled
- * VectorChunkMetadata list.
- *
- * <p>Otherwise, we need to assemble the ChunkMetadata of time column and
the ChunkMetadata of all
- * the subSensors to generate the VectorChunkMetadata
- */
- @Override
- public List<IChunkMetadata> loadChunkMetadataList() {
- return chunkMetadataLoader.loadChunkMetadataList(this);
- }
-
- public List<AlignedChunkMetadata> getCopiedChunkMetadataList() {
- List<IChunkMetadata> timeChunkMetadata =
timeseriesMetadata.getCopiedChunkMetadataList();
- List<List<IChunkMetadata>> valueChunkMetadataList = new ArrayList<>();
- for (TimeseriesMetadata metadata : valueTimeseriesMetadataList) {
- valueChunkMetadataList.add(metadata == null ? null :
metadata.getCopiedChunkMetadataList());
- }
-
- return getAlignedChunkMetadata(timeChunkMetadata, valueChunkMetadataList);
- }
-
- public List<AlignedChunkMetadata> getChunkMetadataList() {
- List<IChunkMetadata> timeChunkMetadata =
timeseriesMetadata.getChunkMetadataList();
- List<List<IChunkMetadata>> valueChunkMetadataList = new ArrayList<>();
- for (TimeseriesMetadata metadata : valueTimeseriesMetadataList) {
- valueChunkMetadataList.add(metadata == null ? null :
metadata.getChunkMetadataList());
- }
-
- return getAlignedChunkMetadata(timeChunkMetadata, valueChunkMetadataList);
- }
-
- /** Notice: if all the value chunks is empty chunk, then return empty list.
*/
- private List<AlignedChunkMetadata> getAlignedChunkMetadata(
- List<IChunkMetadata> timeChunkMetadata, List<List<IChunkMetadata>>
valueChunkMetadataList) {
- List<AlignedChunkMetadata> res = new ArrayList<>();
- for (int i = 0; i < timeChunkMetadata.size(); i++) {
- // only need time column
- if (valueTimeseriesMetadataList.isEmpty()) {
- res.add(new AlignedChunkMetadata(timeChunkMetadata.get(i),
Collections.emptyList()));
- } else {
- List<IChunkMetadata> chunkMetadataList = new ArrayList<>();
- // only at least one sensor exits, we add the AlignedChunkMetadata to
the list
- boolean exits = false;
- for (List<IChunkMetadata> chunkMetadata : valueChunkMetadataList) {
- IChunkMetadata v =
- chunkMetadata == null
- || chunkMetadata.get(i).getStatistics().getCount() == 0
// empty chunk
- ? null
- : chunkMetadata.get(i);
- exits = (exits || v != null);
- chunkMetadataList.add(v);
- }
- if (exits) {
- res.add(new AlignedChunkMetadata(timeChunkMetadata.get(i),
chunkMetadataList));
- }
- }
- }
- return res;
- }
-
- @Override
- public void setChunkMetadataLoader(IChunkMetadataLoader chunkMetadataLoader)
{
- this.chunkMetadataLoader = chunkMetadataLoader;
- }
-
- @Override
- public boolean typeMatch(List<TSDataType> dataTypes) {
- if (valueTimeseriesMetadataList != null) {
- int notMatchCount = 0;
- for (int i = 0, size = dataTypes.size(); i < size; i++) {
- TimeseriesMetadata valueTimeSeriesMetadata =
valueTimeseriesMetadataList.get(i);
- if (valueTimeSeriesMetadata != null
- && !valueTimeSeriesMetadata.typeMatch(dataTypes.get(i))) {
- valueTimeseriesMetadataList.set(i, null);
- notMatchCount++;
- }
- }
- return notMatchCount != dataTypes.size();
- }
- return true;
- }
-
- public List<TimeseriesMetadata> getValueTimeseriesMetadataList() {
- return valueTimeseriesMetadataList;
- }
-
- public TimeseriesMetadata getTimeseriesMetadata() {
- return timeseriesMetadata;
- }
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableDeviceMetadata.java
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableDeviceMetadata.java
new file mode 100644
index 00000000..8614a532
--- /dev/null
+++
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableDeviceMetadata.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.file.metadata;
+
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+
+import java.util.List;
+
+public class TableDeviceMetadata extends AbstractAlignedTimeSeriesMetadata {
+
+ public TableDeviceMetadata(
+ TimeseriesMetadata timeseriesMetadata, List<TimeseriesMetadata>
valueTimeseriesMetadataList) {
+ super(timeseriesMetadata, valueTimeseriesMetadataList);
+ }
+
+ /**
+ * If the vector contains only one sub sensor, just return the sub sensor's
Statistics Otherwise,
+ * return the Statistics of the time column.
+ */
+ @Override
+ public Statistics getStatistics() {
+ return timeseriesMetadata.getStatistics();
+ }
+
+ @Override
+ void constructAlignedChunkMetadata(
+ List<AlignedChunkMetadata> res,
+ IChunkMetadata timeChunkMetadata,
+ List<IChunkMetadata> chunkMetadataList,
+ boolean exits) {
+ res.add(new AlignedChunkMetadata(timeChunkMetadata, chunkMetadataList));
+ }
+}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlock.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlock.java
index c607e906..1cbd10ed 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlock.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlock.java
@@ -455,9 +455,6 @@ public class TsBlock {
@Override
public boolean hasNextTimeValuePair() {
- while (hasNext() && isCurrentValueAllNull()) {
- next();
- }
return hasNext();
}
@@ -499,15 +496,6 @@ public class TsBlock {
public void setRowIndex(int rowIndex) {
this.rowIndex = rowIndex;
}
-
- private boolean isCurrentValueAllNull() {
- for (Column valueColumn : valueColumns) {
- if (!valueColumn.isNull(rowIndex)) {
- return false;
- }
- }
- return true;
- }
}
private long updateRetainedSize() {
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java
similarity index 84%
copy from
java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java
copy to
java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java
index 865fdd21..165367e2 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java
@@ -30,7 +30,7 @@ import org.apache.tsfile.file.metadata.statistics.Statistics;
import org.apache.tsfile.read.common.Chunk;
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.read.filter.basic.Filter;
-import org.apache.tsfile.read.reader.page.AlignedPageReader;
+import org.apache.tsfile.read.reader.page.AbstractAlignedPageReader;
import org.apache.tsfile.read.reader.page.LazyLoadPageData;
import java.io.IOException;
@@ -39,8 +39,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-public class AlignedChunkReader extends AbstractChunkReader {
-
+public abstract class AbstractAlignedChunkReader extends AbstractChunkReader {
// chunk header of the time column
private final ChunkHeader timeChunkHeader;
// chunk data of the time column
@@ -56,7 +55,7 @@ public class AlignedChunkReader extends AbstractChunkReader {
private final IDecryptor decrytor;
@SuppressWarnings("unchecked")
- public AlignedChunkReader(
+ AbstractAlignedChunkReader(
Chunk timeChunk, List<Chunk> valueChunkList, long readStopTime, Filter
queryFilter)
throws IOException {
super(readStopTime, queryFilter);
@@ -76,24 +75,6 @@ public class AlignedChunkReader extends AbstractChunkReader {
initAllPageReaders(timeChunk.getChunkStatistic(),
valueChunkStatisticsList);
}
- public AlignedChunkReader(Chunk timeChunk, List<Chunk> valueChunkList)
throws IOException {
- this(timeChunk, valueChunkList, Long.MIN_VALUE, null);
- }
-
- public AlignedChunkReader(Chunk timeChunk, List<Chunk> valueChunkList,
Filter queryFilter)
- throws IOException {
- this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter);
- }
-
- /**
- * Constructor of ChunkReader by timestamp. This constructor is used to
accelerate queries by
- * filtering out pages whose endTime is less than current timestamp.
- */
- public AlignedChunkReader(Chunk timeChunk, List<Chunk> valueChunkList, long
readStopTime)
- throws IOException {
- this(timeChunk, valueChunkList, readStopTime, null);
- }
-
/** construct all the page readers in this chunk */
private void initAllPageReaders(
Statistics<? extends Serializable> timeChunkStatistics,
@@ -102,7 +83,7 @@ public class AlignedChunkReader extends AbstractChunkReader {
// construct next satisfied page header
while (timeChunkDataBuffer.remaining() > 0) {
// deserialize PageHeader from chunkDataBuffer
- AlignedPageReader alignedPageReader =
+ AbstractAlignedPageReader alignedPageReader =
isSinglePageChunk()
? deserializeFromSinglePageChunk(timeChunkStatistics,
valueChunkStatisticsList)
: deserializeFromMultiPageChunk();
@@ -116,7 +97,7 @@ public class AlignedChunkReader extends AbstractChunkReader {
return (timeChunkHeader.getChunkType() & 0x3F) ==
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER;
}
- private AlignedPageReader deserializeFromSinglePageChunk(
+ private AbstractAlignedPageReader deserializeFromSinglePageChunk(
Statistics<? extends Serializable> timeChunkStatistics,
List<Statistics<? extends Serializable>> valueChunkStatisticsList)
throws IOException {
@@ -136,7 +117,7 @@ public class AlignedChunkReader extends AbstractChunkReader
{
}
}
- if (isAllNull || isEarlierThanReadStopTime(timePageHeader)) {
+ if (needSkipForSinglePageChunk(isAllNull, timePageHeader)) {
// when there is only one page in the chunk, the page statistic is the
same as the chunk, so
// we needn't filter the page again
skipCurrentPage(timePageHeader, valuePageHeaderList);
@@ -145,7 +126,9 @@ public class AlignedChunkReader extends AbstractChunkReader
{
return constructAlignedPageReader(timePageHeader, valuePageHeaderList);
}
- private AlignedPageReader deserializeFromMultiPageChunk() throws IOException
{
+ abstract boolean needSkipForSinglePageChunk(boolean isAllNull, PageHeader
timePageHeader);
+
+ private AbstractAlignedPageReader deserializeFromMultiPageChunk() throws
IOException {
PageHeader timePageHeader =
PageHeader.deserializeFrom(timeChunkDataBuffer,
timeChunkHeader.getDataType());
List<PageHeader> valuePageHeaderList = new ArrayList<>();
@@ -162,18 +145,20 @@ public class AlignedChunkReader extends
AbstractChunkReader {
}
}
- if (isAllNull || isEarlierThanReadStopTime(timePageHeader) ||
pageCanSkip(timePageHeader)) {
+ if (needSkipForMultiPageChunk(isAllNull, timePageHeader)) {
skipCurrentPage(timePageHeader, valuePageHeaderList);
return null;
}
return constructAlignedPageReader(timePageHeader, valuePageHeaderList);
}
+ abstract boolean needSkipForMultiPageChunk(boolean isAllNull, PageHeader
timePageHeader);
+
protected boolean isEarlierThanReadStopTime(final PageHeader timePageHeader)
{
return timePageHeader.getEndTime() < readStopTime;
}
- private boolean pageCanSkip(PageHeader pageHeader) {
+ protected boolean pageCanSkip(PageHeader pageHeader) {
return queryFilter != null
&& !queryFilter.satisfyStartEndTime(pageHeader.getStartTime(),
pageHeader.getEndTime());
}
@@ -192,7 +177,7 @@ public class AlignedChunkReader extends AbstractChunkReader
{
}
}
- private AlignedPageReader constructAlignedPageReader(
+ private AbstractAlignedPageReader constructAlignedPageReader(
PageHeader timePageHeader, List<PageHeader> rawValuePageHeaderList)
throws IOException {
ByteBuffer timePageData =
ChunkReader.deserializePageData(
@@ -245,23 +230,34 @@ public class AlignedChunkReader extends
AbstractChunkReader {
isAllNull = false;
}
}
- if (isAllNull) {
+ if (canSkip(isAllNull, timePageHeader)) {
return null;
}
- AlignedPageReader alignedPageReader =
- new AlignedPageReader(
- timePageHeader,
- timePageData,
- defaultTimeDecoder,
- valuePageHeaderList,
- lazyLoadPageDataArray,
- valueDataTypeList,
- valueDecoderList,
- queryFilter);
- alignedPageReader.setDeleteIntervalList(valueDeleteIntervalsList);
- return alignedPageReader;
+ return constructPageReader(
+ timePageHeader,
+ timePageData,
+ defaultTimeDecoder,
+ valuePageHeaderList,
+ lazyLoadPageDataArray,
+ valueDataTypeList,
+ valueDecoderList,
+ queryFilter,
+ valueDeleteIntervalsList);
}
+ abstract boolean canSkip(boolean isAllNull, PageHeader timePageHeader);
+
+ abstract AbstractAlignedPageReader constructPageReader(
+ PageHeader timePageHeader,
+ ByteBuffer timePageData,
+ Decoder timeDecoder,
+ List<PageHeader> valuePageHeaderList,
+ LazyLoadPageData[] lazyLoadPageDataArray,
+ List<TSDataType> valueDataTypeList,
+ List<Decoder> valueDecoderList,
+ Filter queryFilter,
+ List<List<TimeRange>> valueDeleteIntervalsList);
+
protected boolean pageDeleted(PageHeader pageHeader, List<TimeRange>
deleteIntervals) {
if (pageHeader.getEndTime() < readStopTime) {
return true;
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java
index 865fdd21..e7879734 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java
@@ -19,61 +19,26 @@
package org.apache.tsfile.read.reader.chunk;
-import org.apache.tsfile.compress.IUnCompressor;
import org.apache.tsfile.encoding.decoder.Decoder;
-import org.apache.tsfile.encrypt.IDecryptor;
import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.file.MetaMarker;
-import org.apache.tsfile.file.header.ChunkHeader;
import org.apache.tsfile.file.header.PageHeader;
-import org.apache.tsfile.file.metadata.statistics.Statistics;
import org.apache.tsfile.read.common.Chunk;
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.read.filter.basic.Filter;
+import org.apache.tsfile.read.reader.page.AbstractAlignedPageReader;
import org.apache.tsfile.read.reader.page.AlignedPageReader;
import org.apache.tsfile.read.reader.page.LazyLoadPageData;
import java.io.IOException;
-import java.io.Serializable;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.List;
-public class AlignedChunkReader extends AbstractChunkReader {
+public class AlignedChunkReader extends AbstractAlignedChunkReader {
- // chunk header of the time column
- private final ChunkHeader timeChunkHeader;
- // chunk data of the time column
- private final ByteBuffer timeChunkDataBuffer;
-
- // chunk headers of all the sub sensors
- private final List<ChunkHeader> valueChunkHeaderList = new ArrayList<>();
- // chunk data of all the sub sensors
- private final List<ByteBuffer> valueChunkDataBufferList = new ArrayList<>();
- // deleted intervals of all the sub sensors
- private final List<List<TimeRange>> valueDeleteIntervalsList = new
ArrayList<>();
-
- private final IDecryptor decrytor;
-
- @SuppressWarnings("unchecked")
public AlignedChunkReader(
Chunk timeChunk, List<Chunk> valueChunkList, long readStopTime, Filter
queryFilter)
throws IOException {
- super(readStopTime, queryFilter);
- this.timeChunkHeader = timeChunk.getHeader();
- this.timeChunkDataBuffer = timeChunk.getData();
-
- List<Statistics<? extends Serializable>> valueChunkStatisticsList = new
ArrayList<>();
- valueChunkList.forEach(
- chunk -> {
- this.valueChunkHeaderList.add(chunk == null ? null :
chunk.getHeader());
- this.valueChunkDataBufferList.add(chunk == null ? null :
chunk.getData());
- this.valueDeleteIntervalsList.add(chunk == null ? null :
chunk.getDeleteIntervalList());
-
- valueChunkStatisticsList.add(chunk == null ? null :
chunk.getChunkStatistic());
- });
- this.decrytor = timeChunk.getDecryptor();
- initAllPageReaders(timeChunk.getChunkStatistic(),
valueChunkStatisticsList);
+ super(timeChunk, valueChunkList, readStopTime, queryFilter);
}
public AlignedChunkReader(Chunk timeChunk, List<Chunk> valueChunkList)
throws IOException {
@@ -94,165 +59,37 @@ public class AlignedChunkReader extends
AbstractChunkReader {
this(timeChunk, valueChunkList, readStopTime, null);
}
- /** construct all the page readers in this chunk */
- private void initAllPageReaders(
- Statistics<? extends Serializable> timeChunkStatistics,
- List<Statistics<? extends Serializable>> valueChunkStatisticsList)
- throws IOException {
- // construct next satisfied page header
- while (timeChunkDataBuffer.remaining() > 0) {
- // deserialize PageHeader from chunkDataBuffer
- AlignedPageReader alignedPageReader =
- isSinglePageChunk()
- ? deserializeFromSinglePageChunk(timeChunkStatistics,
valueChunkStatisticsList)
- : deserializeFromMultiPageChunk();
- if (alignedPageReader != null) {
- pageReaderList.add(alignedPageReader);
- }
- }
+ @Override
+ boolean needSkipForSinglePageChunk(boolean isAllNull, PageHeader
timePageHeader) {
+ return isAllNull || isEarlierThanReadStopTime(timePageHeader);
}
- private boolean isSinglePageChunk() {
- return (timeChunkHeader.getChunkType() & 0x3F) ==
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER;
+ @Override
+ boolean needSkipForMultiPageChunk(boolean isAllNull, PageHeader
timePageHeader) {
+ return isAllNull || isEarlierThanReadStopTime(timePageHeader) ||
pageCanSkip(timePageHeader);
}
- private AlignedPageReader deserializeFromSinglePageChunk(
- Statistics<? extends Serializable> timeChunkStatistics,
- List<Statistics<? extends Serializable>> valueChunkStatisticsList)
- throws IOException {
- PageHeader timePageHeader =
- PageHeader.deserializeFrom(timeChunkDataBuffer, timeChunkStatistics);
- List<PageHeader> valuePageHeaderList = new ArrayList<>();
-
- boolean isAllNull = true;
- for (int i = 0; i < valueChunkDataBufferList.size(); i++) {
- if (valueChunkDataBufferList.get(i) != null) {
- isAllNull = false;
- valuePageHeaderList.add(
- PageHeader.deserializeFrom(
- valueChunkDataBufferList.get(i),
valueChunkStatisticsList.get(i)));
- } else {
- valuePageHeaderList.add(null);
- }
- }
-
- if (isAllNull || isEarlierThanReadStopTime(timePageHeader)) {
- // when there is only one page in the chunk, the page statistic is the
same as the chunk, so
- // we needn't filter the page again
- skipCurrentPage(timePageHeader, valuePageHeaderList);
- return null;
- }
- return constructAlignedPageReader(timePageHeader, valuePageHeaderList);
- }
-
- private AlignedPageReader deserializeFromMultiPageChunk() throws IOException
{
- PageHeader timePageHeader =
- PageHeader.deserializeFrom(timeChunkDataBuffer,
timeChunkHeader.getDataType());
- List<PageHeader> valuePageHeaderList = new ArrayList<>();
-
- boolean isAllNull = true;
- for (int i = 0; i < valueChunkDataBufferList.size(); i++) {
- if (valueChunkDataBufferList.get(i) != null) {
- isAllNull = false;
- valuePageHeaderList.add(
- PageHeader.deserializeFrom(
- valueChunkDataBufferList.get(i),
valueChunkHeaderList.get(i).getDataType()));
- } else {
- valuePageHeaderList.add(null);
- }
- }
-
- if (isAllNull || isEarlierThanReadStopTime(timePageHeader) ||
pageCanSkip(timePageHeader)) {
- skipCurrentPage(timePageHeader, valuePageHeaderList);
- return null;
- }
- return constructAlignedPageReader(timePageHeader, valuePageHeaderList);
+ @Override
+ boolean canSkip(boolean isAllNull, PageHeader timePageHeader) {
+ return isAllNull;
}
- protected boolean isEarlierThanReadStopTime(final PageHeader timePageHeader)
{
- return timePageHeader.getEndTime() < readStopTime;
- }
-
- private boolean pageCanSkip(PageHeader pageHeader) {
- return queryFilter != null
- && !queryFilter.satisfyStartEndTime(pageHeader.getStartTime(),
pageHeader.getEndTime());
- }
-
- private void skipCurrentPage(PageHeader timePageHeader, List<PageHeader>
valuePageHeader) {
- timeChunkDataBuffer.position(
- timeChunkDataBuffer.position() + timePageHeader.getCompressedSize());
- for (int i = 0; i < valuePageHeader.size(); i++) {
- if (valuePageHeader.get(i) != null) {
- valueChunkDataBufferList
- .get(i)
- .position(
- valueChunkDataBufferList.get(i).position()
- + valuePageHeader.get(i).getCompressedSize());
- }
- }
- }
-
- private AlignedPageReader constructAlignedPageReader(
- PageHeader timePageHeader, List<PageHeader> rawValuePageHeaderList)
throws IOException {
- ByteBuffer timePageData =
- ChunkReader.deserializePageData(
- timePageHeader, timeChunkDataBuffer, timeChunkHeader, decrytor);
-
- List<PageHeader> valuePageHeaderList = new ArrayList<>();
- LazyLoadPageData[] lazyLoadPageDataArray = new
LazyLoadPageData[rawValuePageHeaderList.size()];
- List<TSDataType> valueDataTypeList = new ArrayList<>();
- List<Decoder> valueDecoderList = new ArrayList<>();
-
- boolean isAllNull = true;
- for (int i = 0; i < rawValuePageHeaderList.size(); i++) {
- PageHeader valuePageHeader = rawValuePageHeaderList.get(i);
-
- if (valuePageHeader == null || valuePageHeader.getUncompressedSize() ==
0) {
- // Empty Page
- valuePageHeaderList.add(null);
- lazyLoadPageDataArray[i] = null;
- valueDataTypeList.add(null);
- valueDecoderList.add(null);
- } else if (pageDeleted(valuePageHeader,
valueDeleteIntervalsList.get(i))) {
- valueChunkDataBufferList
- .get(i)
- .position(
- valueChunkDataBufferList.get(i).position() +
valuePageHeader.getCompressedSize());
- valuePageHeaderList.add(null);
- lazyLoadPageDataArray[i] = null;
- valueDataTypeList.add(null);
- valueDecoderList.add(null);
- } else {
- ChunkHeader valueChunkHeader = valueChunkHeaderList.get(i);
- int currentPagePosition = valueChunkDataBufferList.get(i).position();
- // adjust position as if we have read the page data even if it is just
lazy-loaded
- valueChunkDataBufferList
- .get(i)
- .position(
- valueChunkDataBufferList.get(i).position() +
valuePageHeader.getCompressedSize());
-
- valuePageHeaderList.add(valuePageHeader);
- lazyLoadPageDataArray[i] =
- new LazyLoadPageData(
- valueChunkDataBufferList.get(i).array(),
- currentPagePosition,
-
IUnCompressor.getUnCompressor(valueChunkHeader.getCompressionType()),
- decrytor);
- valueDataTypeList.add(valueChunkHeader.getDataType());
- valueDecoderList.add(
- Decoder.getDecoderByType(
- valueChunkHeader.getEncodingType(),
valueChunkHeader.getDataType()));
- isAllNull = false;
- }
- }
- if (isAllNull) {
- return null;
- }
+ @Override
+ AbstractAlignedPageReader constructPageReader(
+ PageHeader timePageHeader,
+ ByteBuffer timePageData,
+ Decoder timeDecoder,
+ List<PageHeader> valuePageHeaderList,
+ LazyLoadPageData[] lazyLoadPageDataArray,
+ List<TSDataType> valueDataTypeList,
+ List<Decoder> valueDecoderList,
+ Filter queryFilter,
+ List<List<TimeRange>> valueDeleteIntervalsList) {
AlignedPageReader alignedPageReader =
new AlignedPageReader(
timePageHeader,
timePageData,
- defaultTimeDecoder,
+ timeDecoder,
valuePageHeaderList,
lazyLoadPageDataArray,
valueDataTypeList,
@@ -261,21 +98,4 @@ public class AlignedChunkReader extends AbstractChunkReader
{
alignedPageReader.setDeleteIntervalList(valueDeleteIntervalsList);
return alignedPageReader;
}
-
- protected boolean pageDeleted(PageHeader pageHeader, List<TimeRange>
deleteIntervals) {
- if (pageHeader.getEndTime() < readStopTime) {
- return true;
- }
- if (deleteIntervals != null) {
- for (TimeRange range : deleteIntervals) {
- if (range.contains(pageHeader.getStartTime(),
pageHeader.getEndTime())) {
- return true;
- }
- if (range.overlaps(new TimeRange(pageHeader.getStartTime(),
pageHeader.getEndTime()))) {
- pageHeader.setModified(true);
- }
- }
- }
- return false;
- }
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/TableChunkReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/TableChunkReader.java
new file mode 100644
index 00000000..01a39810
--- /dev/null
+++
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/TableChunkReader.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.read.reader.chunk;
+
+import org.apache.tsfile.encoding.decoder.Decoder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.header.PageHeader;
+import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.read.filter.basic.Filter;
+import org.apache.tsfile.read.reader.page.AbstractAlignedPageReader;
+import org.apache.tsfile.read.reader.page.LazyLoadPageData;
+import org.apache.tsfile.read.reader.page.TablePageReader;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+// difference with AlignedChunkReader is that TableChunkReader works for
TableScan and keep all null
+// rows
+public class TableChunkReader extends AbstractAlignedChunkReader {
+
+ private final List<TimeRange> timeDeleteIntervalsList;
+
+ public TableChunkReader(
+ Chunk timeChunk, List<Chunk> valueChunkList, long readStopTime, Filter
queryFilter)
+ throws IOException {
+ super(timeChunk, valueChunkList, readStopTime, queryFilter);
+ timeDeleteIntervalsList = timeChunk.getDeleteIntervalList();
+ }
+
+ public TableChunkReader(Chunk timeChunk, List<Chunk> valueChunkList, Filter
queryFilter)
+ throws IOException {
+ this(timeChunk, valueChunkList, Long.MIN_VALUE, queryFilter);
+ }
+
+ @Override
+ boolean needSkipForSinglePageChunk(boolean isAllNull, PageHeader
timePageHeader) {
+ return isEarlierThanReadStopTime(timePageHeader);
+ }
+
+ @Override
+ boolean needSkipForMultiPageChunk(boolean isAllNull, PageHeader
timePageHeader) {
+ return isEarlierThanReadStopTime(timePageHeader) ||
pageCanSkip(timePageHeader);
+ }
+
+ @Override
+ boolean canSkip(boolean isAllNull, PageHeader timePageHeader) {
+ return pageDeleted(timePageHeader, timeDeleteIntervalsList);
+ }
+
+ @Override
+ AbstractAlignedPageReader constructPageReader(
+ PageHeader timePageHeader,
+ ByteBuffer timePageData,
+ Decoder timeDecoder,
+ List<PageHeader> valuePageHeaderList,
+ LazyLoadPageData[] lazyLoadPageDataArray,
+ List<TSDataType> valueDataTypeList,
+ List<Decoder> valueDecoderList,
+ Filter queryFilter,
+ List<List<TimeRange>> valueDeleteIntervalsList) {
+ TablePageReader alignedPageReader =
+ new TablePageReader(
+ timePageHeader,
+ timePageData,
+ timeDecoder,
+ valuePageHeaderList,
+ lazyLoadPageDataArray,
+ valueDataTypeList,
+ valueDecoderList,
+ queryFilter);
+ alignedPageReader.setDeleteIntervalList(timeDeleteIntervalsList,
valueDeleteIntervalsList);
+ return alignedPageReader;
+ }
+}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AbstractAlignedPageReader.java
similarity index 68%
copy from
java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java
copy to
java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AbstractAlignedPageReader.java
index 5307ed35..986c487a 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AbstractAlignedPageReader.java
@@ -25,13 +25,11 @@ import org.apache.tsfile.file.header.PageHeader;
import org.apache.tsfile.file.metadata.statistics.Statistics;
import org.apache.tsfile.read.common.BatchData;
import org.apache.tsfile.read.common.BatchDataFactory;
-import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.TsBlockBuilder;
import org.apache.tsfile.read.common.block.TsBlockUtil;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.read.reader.IPageReader;
-import org.apache.tsfile.read.reader.IPointReader;
import org.apache.tsfile.read.reader.series.PaginationController;
import org.apache.tsfile.utils.TsPrimitiveType;
@@ -41,28 +39,27 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.Objects;
import java.util.Optional;
import static
org.apache.tsfile.read.reader.series.PaginationController.UNLIMITED_PAGINATION_CONTROLLER;
-public class AlignedPageReader implements IPageReader {
+public abstract class AbstractAlignedPageReader implements IPageReader {
- private final TimePageReader timePageReader;
- private final List<ValuePageReader> valuePageReaderList;
- private final int valueCount;
+ protected final TimePageReader timePageReader;
+ protected final List<ValuePageReader> valuePageReaderList;
+ protected final int valueCount;
- private final Filter globalTimeFilter;
- private Filter pushDownFilter;
- private PaginationController paginationController =
UNLIMITED_PAGINATION_CONTROLLER;
+ protected final Filter globalTimeFilter;
+ protected Filter pushDownFilter;
+ protected PaginationController paginationController =
UNLIMITED_PAGINATION_CONTROLLER;
- private boolean isModified;
- private TsBlockBuilder builder;
+ protected boolean isModified;
+ protected TsBlockBuilder builder;
- private static final int MASK = 0x80;
+ protected static final int MASK = 0x80;
@SuppressWarnings("squid:S107")
- public AlignedPageReader(
+ AbstractAlignedPageReader(
PageHeader timePageHeader,
ByteBuffer timePageData,
Decoder timeDecoder,
@@ -93,7 +90,7 @@ public class AlignedPageReader implements IPageReader {
}
@SuppressWarnings("squid:S107")
- public AlignedPageReader(
+ AbstractAlignedPageReader(
PageHeader timePageHeader,
ByteBuffer timePageData,
Decoder timeDecoder,
@@ -152,51 +149,32 @@ public class AlignedPageReader implements IPageReader {
}
}
- if (hasNotNullValues && satisfyRecordFilter(timestamp, rowValues)) {
+ if (keepCurrentRow(hasNotNullValues, timestamp, rowValues)) {
pageData.putVector(timestamp, v);
}
}
return pageData.flip();
}
- private boolean satisfyRecordFilter(long timestamp, Object[] rowValues) {
+ abstract boolean keepCurrentRow(boolean hasNotNullValues, long timestamp,
Object[] rowValues);
+
+ protected boolean satisfyRecordFilter(long timestamp, Object[] rowValues) {
return (globalTimeFilter == null || globalTimeFilter.satisfyRow(timestamp,
rowValues))
&& (pushDownFilter == null || pushDownFilter.satisfyRow(timestamp,
rowValues));
}
- @Override
- public boolean timeAllSelected() {
- for (int index = 0; index < getMeasurementCount(); index++) {
- if (!hasNullValue(index)) {
- // When there is any value page point number that is the same as the
time page,
- // it means that all timestamps in time page will be selected.
- return true;
- }
- }
- return false;
- }
-
@Override
public int getMeasurementCount() {
return valueCount;
}
- public IPointReader getLazyPointReader() throws IOException {
- return new LazyLoadAlignedPagePointReader(timePageReader,
valuePageReaderList);
- }
-
- private boolean allPageDataSatisfy() {
- return !isModified
- && timeAllSelected()
- && globalTimeFilterAllSatisfy()
- && pushDownFilterAllSatisfy();
- }
+ abstract boolean allPageDataSatisfy();
- private boolean globalTimeFilterAllSatisfy() {
+ boolean globalTimeFilterAllSatisfy() {
return globalTimeFilter == null || globalTimeFilter.allSatisfy(this);
}
- private boolean pushDownFilterAllSatisfy() {
+ boolean pushDownFilterAllSatisfy() {
return pushDownFilter == null || pushDownFilter.allSatisfy(this);
}
@@ -209,7 +187,6 @@ public class AlignedPageReader implements IPageReader {
return builder.build();
}
- // if all the sub sensors' value are null in current row, just discard it
// if !filter.satisfy, discard this row
boolean[] keepCurrentRow = new boolean[timeBatch.length];
boolean globalTimeFilterAllSatisfy = globalTimeFilterAllSatisfy();
@@ -219,26 +196,13 @@ public class AlignedPageReader implements IPageReader {
updateKeepCurrentRowThroughGlobalTimeFilter(keepCurrentRow, timeBatch);
}
- boolean[][] isDeleted = null;
- if ((isModified || !timeAllSelected()) && valueCount != 0) {
- // using bitMap in valuePageReaders to indicate whether columns of
current row are all null.
- byte[] bitmask = new byte[(timeBatch.length - 1) / 8 + 1];
- Arrays.fill(bitmask, (byte) 0x00);
- isDeleted = new boolean[valueCount][timeBatch.length];
-
- fillIsDeletedAndBitMask(timeBatch, isDeleted, bitmask);
-
- updateKeepCurrentRowThroughBitmask(keepCurrentRow, bitmask);
+ if (timePageReader.isModified()) {
+ updateKeepCurrentRowThroughDeletion(keepCurrentRow, timeBatch);
}
boolean pushDownFilterAllSatisfy = pushDownFilterAllSatisfy();
- // construct time column
- // when pushDownFilterAllSatisfy = true, we can skip rows by OFFSET & LIMIT
- int readEndIndex = buildTimeColumn(timeBatch, keepCurrentRow,
pushDownFilterAllSatisfy);
-
- // construct value columns
- buildValueColumns(readEndIndex, keepCurrentRow, isDeleted);
+ constructResult(keepCurrentRow, timeBatch, pushDownFilterAllSatisfy);
TsBlock unFilteredBlock = builder.build();
if (pushDownFilterAllSatisfy) {
@@ -288,7 +252,26 @@ public class AlignedPageReader implements IPageReader {
}
}
- private int buildTimeColumn(
+ abstract void constructResult(
+ boolean[] keepCurrentRow, long[] timeBatch, boolean
pushDownFilterAllSatisfy)
+ throws IOException;
+
+ private void updateKeepCurrentRowThroughGlobalTimeFilter(
+ boolean[] keepCurrentRow, long[] timeBatch) {
+ for (int i = 0, n = timeBatch.length; i < n; i++) {
+ keepCurrentRow[i] = globalTimeFilter.satisfy(timeBatch[i], null);
+ }
+ }
+
+ private void updateKeepCurrentRowThroughDeletion(boolean[] keepCurrentRow,
long[] timeBatch) {
+ for (int i = 0, n = timeBatch.length; i < n; i++) {
+ if (keepCurrentRow[i]) {
+ keepCurrentRow[i] = !timePageReader.isDeleted(timeBatch[i]);
+ }
+ }
+ }
+
+ protected int buildTimeColumn(
long[] timeBatch, boolean[] keepCurrentRow, boolean
pushDownFilterAllSatisfy) {
if (pushDownFilterAllSatisfy) {
return buildTimeColumnWithPagination(timeBatch, keepCurrentRow);
@@ -329,97 +312,6 @@ public class AlignedPageReader implements IPageReader {
return readEndIndex + 1;
}
- private void buildValueColumns(int readEndIndex, boolean[] keepCurrentRow,
boolean[][] isDeleted)
- throws IOException {
- for (int i = 0; i < valueCount; i++) {
- ValuePageReader pageReader = valuePageReaderList.get(i);
- if (pageReader != null) {
- if (pageReader.isModified()) {
- pageReader.writeColumnBuilderWithNextBatch(
- readEndIndex,
- builder.getColumnBuilder(i),
- keepCurrentRow,
- Objects.requireNonNull(isDeleted)[i]);
- } else {
- pageReader.writeColumnBuilderWithNextBatch(
- readEndIndex, builder.getColumnBuilder(i), keepCurrentRow);
- }
- } else {
- for (int j = 0; j < readEndIndex; j++) {
- if (keepCurrentRow[j]) {
- builder.getColumnBuilder(i).appendNull();
- }
- }
- }
- }
- }
-
- private void fillIsDeletedAndBitMask(long[] timeBatch, boolean[][]
isDeleted, byte[] bitmask)
- throws IOException {
- for (int columnIndex = 0; columnIndex < valueCount; columnIndex++) {
- ValuePageReader pageReader = valuePageReaderList.get(columnIndex);
- if (pageReader != null) {
- byte[] bitmap = pageReader.getBitmap();
-
- if (pageReader.isModified()) {
- pageReader.fillIsDeleted(timeBatch, isDeleted[columnIndex]);
- updateBitmapThroughIsDeleted(bitmap, isDeleted[columnIndex]);
- }
-
- for (int i = 0, n = bitmask.length; i < n; i++) {
- bitmask[i] = (byte) (bitmap[i] | bitmask[i]);
- }
- }
- }
- }
-
- private void updateBitmapThroughIsDeleted(byte[] bitmap, boolean[]
isDeleted) {
- for (int i = 0, n = isDeleted.length; i < n; i++) {
- if (isDeleted[i]) {
- int shift = i % 8;
- bitmap[i / 8] = (byte) (bitmap[i / 8] & (~(MASK >>> shift)));
- }
- }
- }
-
- private void updateKeepCurrentRowThroughGlobalTimeFilter(
- boolean[] keepCurrentRow, long[] timeBatch) {
- for (int i = 0, n = timeBatch.length; i < n; i++) {
- keepCurrentRow[i] = globalTimeFilter.satisfy(timeBatch[i], null);
- }
- }
-
- private void updateKeepCurrentRowThroughBitmask(boolean[] keepCurrentRow,
byte[] bitmask) {
- for (int i = 0, n = bitmask.length; i < n; i++) {
- if (bitmask[i] == (byte) 0xFF) {
- // 8 rows are not all null, do nothing
- } else if (bitmask[i] == (byte) 0x00) {
- Arrays.fill(keepCurrentRow, i * 8, Math.min(i * 8 + 8,
keepCurrentRow.length), false);
- } else {
- for (int j = 0; j < 8 && (i * 8 + j < keepCurrentRow.length); j++) {
- if (((bitmask[i] & 0xFF) & (MASK >>> j)) == 0) {
- keepCurrentRow[i * 8 + j] = false;
- }
- }
- }
- }
- }
-
- public void setDeleteIntervalList(List<List<TimeRange>> list) {
- for (int i = 0; i < valueCount; i++) {
- if (valuePageReaderList.get(i) != null) {
- valuePageReaderList.get(i).setDeleteIntervalList(list.get(i));
- }
- }
- }
-
- @Override
- public Statistics<? extends Serializable> getStatistics() {
- return valuePageReaderList.size() == 1 && valuePageReaderList.get(0) !=
null
- ? valuePageReaderList.get(0).getStatistics()
- : timePageReader.getStatistics();
- }
-
@Override
public Statistics<? extends Serializable> getTimeStatistics() {
return timePageReader.getStatistics();
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java
index 5307ed35..f180152c 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java
@@ -23,43 +23,18 @@ import org.apache.tsfile.encoding.decoder.Decoder;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.header.PageHeader;
import org.apache.tsfile.file.metadata.statistics.Statistics;
-import org.apache.tsfile.read.common.BatchData;
-import org.apache.tsfile.read.common.BatchDataFactory;
import org.apache.tsfile.read.common.TimeRange;
-import org.apache.tsfile.read.common.block.TsBlock;
-import org.apache.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.tsfile.read.common.block.TsBlockUtil;
import org.apache.tsfile.read.filter.basic.Filter;
-import org.apache.tsfile.read.reader.IPageReader;
import org.apache.tsfile.read.reader.IPointReader;
-import org.apache.tsfile.read.reader.series.PaginationController;
-import org.apache.tsfile.utils.TsPrimitiveType;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
-import java.util.Optional;
-import static
org.apache.tsfile.read.reader.series.PaginationController.UNLIMITED_PAGINATION_CONTROLLER;
-
-public class AlignedPageReader implements IPageReader {
-
- private final TimePageReader timePageReader;
- private final List<ValuePageReader> valuePageReaderList;
- private final int valueCount;
-
- private final Filter globalTimeFilter;
- private Filter pushDownFilter;
- private PaginationController paginationController =
UNLIMITED_PAGINATION_CONTROLLER;
-
- private boolean isModified;
- private TsBlockBuilder builder;
-
- private static final int MASK = 0x80;
+public class AlignedPageReader extends AbstractAlignedPageReader {
@SuppressWarnings("squid:S107")
public AlignedPageReader(
@@ -71,25 +46,15 @@ public class AlignedPageReader implements IPageReader {
List<TSDataType> valueDataTypeList,
List<Decoder> valueDecoderList,
Filter globalTimeFilter) {
- timePageReader = new TimePageReader(timePageHeader, timePageData,
timeDecoder);
- isModified = timePageReader.isModified();
- valuePageReaderList = new ArrayList<>(valuePageHeaderList.size());
- for (int i = 0; i < valuePageHeaderList.size(); i++) {
- if (valuePageHeaderList.get(i) != null) {
- ValuePageReader valuePageReader =
- new ValuePageReader(
- valuePageHeaderList.get(i),
- valuePageDataList.get(i),
- valueDataTypeList.get(i),
- valueDecoderList.get(i));
- valuePageReaderList.add(valuePageReader);
- isModified = isModified || valuePageReader.isModified();
- } else {
- valuePageReaderList.add(null);
- }
- }
- this.globalTimeFilter = globalTimeFilter;
- this.valueCount = valuePageReaderList.size();
+ super(
+ timePageHeader,
+ timePageData,
+ timeDecoder,
+ valuePageHeaderList,
+ valuePageDataList,
+ valueDataTypeList,
+ valueDecoderList,
+ globalTimeFilter);
}
@SuppressWarnings("squid:S107")
@@ -105,63 +70,20 @@ public class AlignedPageReader implements IPageReader {
List<TSDataType> valueDataTypeList,
List<Decoder> valueDecoderList,
Filter globalTimeFilter) {
- timePageReader = new TimePageReader(timePageHeader, timePageData,
timeDecoder);
- isModified = timePageReader.isModified();
- valuePageReaderList = new ArrayList<>(valuePageHeaderList.size());
- for (int i = 0; i < valuePageHeaderList.size(); i++) {
- if (valuePageHeaderList.get(i) != null) {
- ValuePageReader valuePageReader =
- new ValuePageReader(
- valuePageHeaderList.get(i),
- lazyLoadPageDataArray[i],
- valueDataTypeList.get(i),
- valueDecoderList.get(i));
- valuePageReaderList.add(valuePageReader);
- isModified = isModified || valuePageReader.isModified();
- } else {
- valuePageReaderList.add(null);
- }
- }
- this.globalTimeFilter = globalTimeFilter;
- this.valueCount = valuePageReaderList.size();
+ super(
+ timePageHeader,
+ timePageData,
+ timeDecoder,
+ valuePageHeaderList,
+ lazyLoadPageDataArray,
+ valueDataTypeList,
+ valueDecoderList,
+ globalTimeFilter);
}
@Override
- public BatchData getAllSatisfiedPageData(boolean ascending) throws
IOException {
- BatchData pageData = BatchDataFactory.createBatchData(TSDataType.VECTOR,
ascending, false);
- int timeIndex = -1;
- Object[] rowValues = new Object[valueCount];
- while (timePageReader.hasNextTime()) {
- long timestamp = timePageReader.nextTime();
- timeIndex++;
-
- TsPrimitiveType[] v = new TsPrimitiveType[valueCount];
- // if all the sub sensors' value are null in current row, just discard it
- boolean hasNotNullValues = false;
- for (int i = 0; i < valueCount; i++) {
- ValuePageReader pageReader = valuePageReaderList.get(i);
- if (pageReader != null) {
- v[i] = pageReader.nextValue(timestamp, timeIndex);
- rowValues[i] = (v[i] == null) ? null : v[i].getValue();
- } else {
- v[i] = null;
- rowValues[i] = null;
- }
- if (rowValues[i] != null) {
- hasNotNullValues = true;
- }
- }
-
- if (hasNotNullValues && satisfyRecordFilter(timestamp, rowValues)) {
- pageData.putVector(timestamp, v);
- }
- }
- return pageData.flip();
- }
-
- private boolean satisfyRecordFilter(long timestamp, Object[] rowValues) {
- return (globalTimeFilter == null || globalTimeFilter.satisfyRow(timestamp,
rowValues))
- && (pushDownFilter == null || pushDownFilter.satisfyRow(timestamp,
rowValues));
+ boolean keepCurrentRow(boolean hasNotNullValues, long timestamp, Object[]
rowValues) {
+ return hasNotNullValues && satisfyRecordFilter(timestamp, rowValues);
}
@Override
@@ -176,49 +98,23 @@ public class AlignedPageReader implements IPageReader {
return false;
}
- @Override
- public int getMeasurementCount() {
- return valueCount;
- }
-
public IPointReader getLazyPointReader() throws IOException {
return new LazyLoadAlignedPagePointReader(timePageReader,
valuePageReaderList);
}
- private boolean allPageDataSatisfy() {
+ @Override
+ boolean allPageDataSatisfy() {
return !isModified
&& timeAllSelected()
&& globalTimeFilterAllSatisfy()
&& pushDownFilterAllSatisfy();
}
- private boolean globalTimeFilterAllSatisfy() {
- return globalTimeFilter == null || globalTimeFilter.allSatisfy(this);
- }
-
- private boolean pushDownFilterAllSatisfy() {
- return pushDownFilter == null || pushDownFilter.allSatisfy(this);
- }
-
@Override
- public TsBlock getAllSatisfiedData() throws IOException {
- long[] timeBatch = timePageReader.getNextTimeBatch();
-
- if (allPageDataSatisfy()) {
- buildResultWithoutAnyFilterAndDelete(timeBatch);
- return builder.build();
- }
+ void constructResult(boolean[] keepCurrentRow, long[] timeBatch, boolean
pushDownFilterAllSatisfy)
+ throws IOException {
// if all the sub sensors' value are null in current row, just discard it
- // if !filter.satisfy, discard this row
- boolean[] keepCurrentRow = new boolean[timeBatch.length];
- boolean globalTimeFilterAllSatisfy = globalTimeFilterAllSatisfy();
- if (globalTimeFilterAllSatisfy) {
- Arrays.fill(keepCurrentRow, true);
- } else {
- updateKeepCurrentRowThroughGlobalTimeFilter(keepCurrentRow, timeBatch);
- }
-
boolean[][] isDeleted = null;
if ((isModified || !timeAllSelected()) && valueCount != 0) {
// using bitMap in valuePageReaders to indicate whether columns of
current row are all null.
@@ -231,102 +127,12 @@ public class AlignedPageReader implements IPageReader {
updateKeepCurrentRowThroughBitmask(keepCurrentRow, bitmask);
}
- boolean pushDownFilterAllSatisfy = pushDownFilterAllSatisfy();
-
// construct time column
// when pushDownFilterAllSatisfy = true, we can skip rows by OFFSET & LIMIT
int readEndIndex = buildTimeColumn(timeBatch, keepCurrentRow,
pushDownFilterAllSatisfy);
// construct value columns
buildValueColumns(readEndIndex, keepCurrentRow, isDeleted);
-
- TsBlock unFilteredBlock = builder.build();
- if (pushDownFilterAllSatisfy) {
- // OFFSET & LIMIT has been consumed in buildTimeColumn
- return unFilteredBlock;
- }
- builder.reset();
- return TsBlockUtil.applyFilterAndLimitOffsetToTsBlock(
- unFilteredBlock, builder, pushDownFilter, paginationController);
- }
-
- private void buildResultWithoutAnyFilterAndDelete(long[] timeBatch) throws
IOException {
- if (paginationController.hasCurOffset(timeBatch.length)) {
- paginationController.consumeOffset(timeBatch.length);
- } else {
- int readStartIndex = 0;
- if (paginationController.hasCurOffset()) {
- readStartIndex = (int) paginationController.getCurOffset();
- // consume the remaining offset
- paginationController.consumeOffset(readStartIndex);
- }
-
- // not included
- int readEndIndex = timeBatch.length;
- if (paginationController.hasCurLimit() &&
paginationController.getCurLimit() > 0) {
- readEndIndex =
- Math.min(readEndIndex, readStartIndex + (int)
paginationController.getCurLimit());
- paginationController.consumeLimit((long) readEndIndex -
readStartIndex);
- }
-
- // construct time column
- for (int i = readStartIndex; i < readEndIndex; i++) {
- builder.getTimeColumnBuilder().writeLong(timeBatch[i]);
- builder.declarePosition();
- }
-
- // construct value columns
- for (int i = 0; i < valueCount; i++) {
- ValuePageReader pageReader = valuePageReaderList.get(i);
- if (pageReader != null) {
- pageReader.writeColumnBuilderWithNextBatch(
- readStartIndex, readEndIndex, builder.getColumnBuilder(i));
- } else {
- builder.getColumnBuilder(i).appendNull(readEndIndex -
readStartIndex);
- }
- }
- }
- }
-
- private int buildTimeColumn(
- long[] timeBatch, boolean[] keepCurrentRow, boolean
pushDownFilterAllSatisfy) {
- if (pushDownFilterAllSatisfy) {
- return buildTimeColumnWithPagination(timeBatch, keepCurrentRow);
- } else {
- return buildTimeColumnWithoutPagination(timeBatch, keepCurrentRow);
- }
- }
-
- private int buildTimeColumnWithPagination(long[] timeBatch, boolean[]
keepCurrentRow) {
- int readEndIndex = timeBatch.length;
- for (int rowIndex = 0; rowIndex < timeBatch.length; rowIndex++) {
- if (keepCurrentRow[rowIndex]) {
- if (paginationController.hasCurOffset()) {
- paginationController.consumeOffset();
- keepCurrentRow[rowIndex] = false;
- } else if (paginationController.hasCurLimit()) {
- builder.getTimeColumnBuilder().writeLong(timeBatch[rowIndex]);
- builder.declarePosition();
- paginationController.consumeLimit();
- } else {
- readEndIndex = rowIndex;
- break;
- }
- }
- }
- return readEndIndex;
- }
-
- private int buildTimeColumnWithoutPagination(long[] timeBatch, boolean[]
keepCurrentRow) {
- int readEndIndex = 0;
- for (int i = 0; i < timeBatch.length; i++) {
- if (keepCurrentRow[i]) {
- builder.getTimeColumnBuilder().writeLong(timeBatch[i]);
- builder.declarePosition();
- readEndIndex = i;
- }
- }
- return readEndIndex + 1;
}
private void buildValueColumns(int readEndIndex, boolean[] keepCurrentRow,
boolean[][] isDeleted)
@@ -382,13 +188,6 @@ public class AlignedPageReader implements IPageReader {
}
}
- private void updateKeepCurrentRowThroughGlobalTimeFilter(
- boolean[] keepCurrentRow, long[] timeBatch) {
- for (int i = 0, n = timeBatch.length; i < n; i++) {
- keepCurrentRow[i] = globalTimeFilter.satisfy(timeBatch[i], null);
- }
- }
-
private void updateKeepCurrentRowThroughBitmask(boolean[] keepCurrentRow,
byte[] bitmask) {
for (int i = 0, n = bitmask.length; i < n; i++) {
if (bitmask[i] == (byte) 0xFF) {
@@ -419,62 +218,4 @@ public class AlignedPageReader implements IPageReader {
? valuePageReaderList.get(0).getStatistics()
: timePageReader.getStatistics();
}
-
- @Override
- public Statistics<? extends Serializable> getTimeStatistics() {
- return timePageReader.getStatistics();
- }
-
- @Override
- public Optional<Statistics<? extends Serializable>> getMeasurementStatistics(
- int measurementIndex) {
- ValuePageReader valuePageReader =
valuePageReaderList.get(measurementIndex);
- return Optional.ofNullable(valuePageReader == null ? null :
valuePageReader.getStatistics());
- }
-
- @Override
- public boolean hasNullValue(int measurementIndex) {
- long rowCount = getTimeStatistics().getCount();
- Optional<Statistics<? extends Serializable>> statistics =
- getMeasurementStatistics(measurementIndex);
- return statistics.map(stat -> stat.hasNullValue(rowCount)).orElse(true);
- }
-
- @Override
- public void addRecordFilter(Filter filter) {
- this.pushDownFilter = filter;
- }
-
- @Override
- public void setLimitOffset(PaginationController paginationController) {
- this.paginationController = paginationController;
- }
-
- @Override
- public boolean isModified() {
- return isModified;
- }
-
- @Override
- public void initTsBlockBuilder(List<TSDataType> dataTypes) {
- if (paginationController.hasLimit()) {
- builder =
- new TsBlockBuilder(
- (int)
- Math.min(
- paginationController.getCurLimit(),
- timePageReader.getStatistics().getCount()),
- dataTypes);
- } else {
- builder = new TsBlockBuilder((int)
timePageReader.getStatistics().getCount(), dataTypes);
- }
- }
-
- public TimePageReader getTimePageReader() {
- return timePageReader;
- }
-
- public List<ValuePageReader> getValuePageReaderList() {
- return valuePageReaderList;
- }
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/TablePageReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/TablePageReader.java
new file mode 100644
index 00000000..e43baedf
--- /dev/null
+++
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/TablePageReader.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.read.reader.page;
+
+import org.apache.tsfile.encoding.decoder.Decoder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.header.PageHeader;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.read.filter.basic.Filter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+// difference with AlignedPageReader is that TablePageReader works for
TableScan and keep all null
+// rows
+public class TablePageReader extends AbstractAlignedPageReader {
+
+ public TablePageReader(
+ PageHeader timePageHeader,
+ ByteBuffer timePageData,
+ Decoder timeDecoder,
+ List<PageHeader> valuePageHeaderList,
+ List<ByteBuffer> valuePageDataList,
+ List<TSDataType> valueDataTypeList,
+ List<Decoder> valueDecoderList,
+ Filter globalTimeFilter) {
+ super(
+ timePageHeader,
+ timePageData,
+ timeDecoder,
+ valuePageHeaderList,
+ valuePageDataList,
+ valueDataTypeList,
+ valueDecoderList,
+ globalTimeFilter);
+ }
+
+ public TablePageReader(
+ PageHeader timePageHeader,
+ ByteBuffer timePageData,
+ Decoder timeDecoder,
+ List<PageHeader> valuePageHeaderList,
+ LazyLoadPageData[] lazyLoadPageDataArray,
+ List<TSDataType> valueDataTypeList,
+ List<Decoder> valueDecoderList,
+ Filter globalTimeFilter) {
+ super(
+ timePageHeader,
+ timePageData,
+ timeDecoder,
+ valuePageHeaderList,
+ lazyLoadPageDataArray,
+ valueDataTypeList,
+ valueDecoderList,
+ globalTimeFilter);
+ }
+
+ @Override
+ public Statistics<? extends Serializable> getStatistics() {
+ return timePageReader.getStatistics();
+ }
+
+ @Override
+ boolean keepCurrentRow(boolean hasNotNullValues, long timestamp, Object[]
rowValues) {
+ return satisfyRecordFilter(timestamp, rowValues);
+ }
+
+ @Override
+ boolean allPageDataSatisfy() {
+ return !isModified && globalTimeFilterAllSatisfy() &&
pushDownFilterAllSatisfy();
+ }
+
+ @Override
+ void constructResult(boolean[] keepCurrentRow, long[] timeBatch, boolean
pushDownFilterAllSatisfy)
+ throws IOException {
+ // construct time column
+ // when pushDownFilterAllSatisfy = true, we can skip rows by OFFSET & LIMIT
+ int readEndIndex = buildTimeColumn(timeBatch, keepCurrentRow,
pushDownFilterAllSatisfy);
+ // construct value columns
+ buildValueColumns(readEndIndex, keepCurrentRow, timeBatch);
+ }
+
+ private void buildValueColumns(int readEndIndex, boolean[] keepCurrentRow,
long[] timeBatch)
+ throws IOException {
+ for (int i = 0; i < valueCount; i++) {
+ ValuePageReader pageReader = valuePageReaderList.get(i);
+
+ if (pageReader != null) {
+ if (pageReader.isModified()) {
+ boolean[] isDeleted = new boolean[timeBatch.length];
+ pageReader.fillIsDeleted(timeBatch, isDeleted, keepCurrentRow);
+ pageReader.writeColumnBuilderWithNextBatch(
+ readEndIndex, builder.getColumnBuilder(i), keepCurrentRow,
isDeleted);
+ } else {
+ pageReader.writeColumnBuilderWithNextBatch(
+ readEndIndex, builder.getColumnBuilder(i), keepCurrentRow);
+ }
+ } else {
+ for (int j = 0; j < readEndIndex; j++) {
+ if (keepCurrentRow[j]) {
+ builder.getColumnBuilder(i).appendNull();
+ }
+ }
+ }
+ }
+ }
+
+ public void setDeleteIntervalList(
+ List<TimeRange> timeDeletions, List<List<TimeRange>> valueDeletionsList)
{
+ timePageReader.setDeleteIntervalList(timeDeletions);
+ for (int i = 0; i < valueCount; i++) {
+ if (valuePageReaderList.get(i) != null) {
+
valuePageReaderList.get(i).setDeleteIntervalList(valueDeletionsList.get(i));
+ }
+ }
+ }
+}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java
index 58e9ff51..cf79e68a 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java
@@ -603,6 +603,14 @@ public class ValuePageReader {
}
}
+ public void fillIsDeleted(long[] timestamp, boolean[] isDeleted, boolean[]
keepCurrentRow) {
+ for (int i = 0, n = timestamp.length; i < n; i++) {
+ if (keepCurrentRow[i]) {
+ isDeleted[i] = isDeleted(timestamp[i]);
+ }
+ }
+ }
+
public TSDataType getDataType() {
return dataType;
}