This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch fix_TableChunkReader in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit 0b589e0be77520f2f4c1b4571b263df7d870e3e3 Author: Tian Jiang <[email protected]> AuthorDate: Wed Nov 6 15:04:06 2024 +0800 Fix that time deletion does not take effect in TableChunkReader --- .../reader/chunk/AbstractAlignedChunkReader.java | 3 + .../tsfile/read/reader/chunk/TableChunkReader.java | 7 +- .../reader/page/AbstractAlignedPageReader.java | 3 +- .../read/reader/chunk/TableChunkReaderTest.java | 90 ++++++++++++++++++++++ .../org/apache/tsfile/tableview/TableViewTest.java | 32 ++++---- 5 files changed, 115 insertions(+), 20 deletions(-) diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java index 220f9582..6005bb0b 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java @@ -52,6 +52,8 @@ public abstract class AbstractAlignedChunkReader extends AbstractChunkReader { private final List<ByteBuffer> valueChunkDataBufferList = new ArrayList<>(); // deleted intervals of all the sub sensors private final List<List<TimeRange>> valueDeleteIntervalsList = new ArrayList<>(); + // deleted intervals of time column + protected final List<TimeRange> timeDeleteIntervalList; private final EncryptParameter encryptParam; @@ -62,6 +64,7 @@ public abstract class AbstractAlignedChunkReader extends AbstractChunkReader { super(readStopTime, queryFilter); this.timeChunkHeader = timeChunk.getHeader(); this.timeChunkDataBuffer = timeChunk.getData(); + this.timeDeleteIntervalList = timeChunk.getDeleteIntervalList(); List<Statistics<? extends Serializable>> valueChunkStatisticsList = new ArrayList<>(); valueChunkList.forEach( diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/TableChunkReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/TableChunkReader.java index 01a39810..cc156e32 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/TableChunkReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/TableChunkReader.java @@ -37,13 +37,10 @@ import java.util.List; // rows public class TableChunkReader extends AbstractAlignedChunkReader { - private final List<TimeRange> timeDeleteIntervalsList; - public TableChunkReader( Chunk timeChunk, List<Chunk> valueChunkList, long readStopTime, Filter queryFilter) throws IOException { super(timeChunk, valueChunkList, readStopTime, queryFilter); - timeDeleteIntervalsList = timeChunk.getDeleteIntervalList(); } public TableChunkReader(Chunk timeChunk, List<Chunk> valueChunkList, Filter queryFilter) @@ -63,7 +60,7 @@ public class TableChunkReader extends AbstractAlignedChunkReader { @Override boolean canSkip(boolean isAllNull, PageHeader timePageHeader) { - return pageDeleted(timePageHeader, timeDeleteIntervalsList); + return pageDeleted(timePageHeader, timeDeleteIntervalList); } @Override @@ -87,7 +84,7 @@ public class TableChunkReader extends AbstractAlignedChunkReader { valueDataTypeList, valueDecoderList, queryFilter); - alignedPageReader.setDeleteIntervalList(timeDeleteIntervalsList, valueDeleteIntervalsList); + alignedPageReader.setDeleteIntervalList(timeDeleteIntervalList, valueDeleteIntervalsList); return alignedPageReader; } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AbstractAlignedPageReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AbstractAlignedPageReader.java index 986c487a..9230d486 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AbstractAlignedPageReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AbstractAlignedPageReader.java @@ -149,7 +149,8 @@ public abstract class AbstractAlignedPageReader implements IPageReader { } } - if (keepCurrentRow(hasNotNullValues, timestamp, rowValues)) { + if (keepCurrentRow(hasNotNullValues, timestamp, rowValues) + && !timePageReader.isDeleted(timestamp)) { pageData.putVector(timestamp, v); } } diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/chunk/TableChunkReaderTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/chunk/TableChunkReaderTest.java new file mode 100644 index 00000000..c21fa076 --- /dev/null +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/chunk/TableChunkReaderTest.java @@ -0,0 +1,90 @@ +package org.apache.tsfile.read.reader.chunk; + +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.AlignedChunkMetadata; +import org.apache.tsfile.file.metadata.ChunkMetadata; +import org.apache.tsfile.file.metadata.IChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID.Factory; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.common.BatchData; +import org.apache.tsfile.read.common.Chunk; +import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.read.controller.CachedChunkLoaderImpl; + +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.tsfile.tableview.TableViewTest.genTableSchema; +import static org.apache.tsfile.tableview.TableViewTest.testDir; +import static org.apache.tsfile.tableview.TableViewTest.writeTsFile; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class TableChunkReaderTest { + + @Before + public void setUp() throws Exception { + new File(testDir).mkdirs(); + } + + @After + public void tearDown() throws Exception { + FileUtils.deleteDirectory(new File(testDir)); + } + + @Test + public void testWithTimeDeletion() throws IOException, WriteProcessException { + TableSchema tableSchema = genTableSchema(0); + final File testFile = new File(testDir, "testFile"); + writeTsFile(tableSchema, testFile); + + CachedChunkLoaderImpl chunkLoader; + List<IChunkMetadata> chunkMetadataList; + try (TsFileSequenceReader sequenceReader = + new TsFileSequenceReader(testFile.getAbsolutePath())) { + chunkLoader = new CachedChunkLoaderImpl(sequenceReader); + + chunkMetadataList = + sequenceReader.getIChunkMetadataList( + Factory.DEFAULT_FACTORY.create( + new String[] {tableSchema.getTableName(), "0", "0", "0", "0", "0"}), + ""); + AlignedChunkMetadata alignedChunkMetadata = (AlignedChunkMetadata) chunkMetadataList.get(0); + + Chunk timeChunk = + chunkLoader.loadChunk(((ChunkMetadata) alignedChunkMetadata.getTimeChunkMetadata())); + List<Chunk> valueChunks = + alignedChunkMetadata.getValueChunkMetadataList().stream() + .map(c -> (ChunkMetadata) c) + .map( + c -> { + try { + return chunkLoader.loadChunk(c); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + + // [0, 9] -> [5, 9] + timeChunk.setDeleteIntervalList(Collections.singletonList(new TimeRange(0, 4))); + + TableChunkReader tableChunkReader = new TableChunkReader(timeChunk, valueChunks, null); + BatchData batchData = tableChunkReader.nextPageData(); + for (int i = 5; i < 10; i++) { + assertEquals(i, batchData.currentTime()); + batchData.next(); + } + assertFalse(batchData.hasCurrent()); + } + } +} diff --git a/java/tsfile/src/test/java/org/apache/tsfile/tableview/TableViewTest.java b/java/tsfile/src/test/java/org/apache/tsfile/tableview/TableViewTest.java index 1a2ea4a2..c7edaaf9 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/tableview/TableViewTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/tableview/TableViewTest.java @@ -21,6 +21,7 @@ package org.apache.tsfile.tableview; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.exception.read.ReadProcessException; +import org.apache.tsfile.exception.write.WriteProcessException; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.IDeviceID.Factory; import org.apache.tsfile.file.metadata.TableSchema; @@ -72,11 +73,11 @@ import static org.junit.Assert.assertTrue; public class TableViewTest { - private final String testDir = "target" + File.separator + "tableViewTest"; - private final int idSchemaNum = 5; - private final int measurementSchemaNum = 5; - private TableSchema testTableSchema; - private int numTimestampPerDevice = 10; + public static final String testDir = "target" + File.separator + "tableViewTest"; + private static final int idSchemaNum = 5; + private static final int measurementSchemaNum = 5; + private static TableSchema testTableSchema; + private static int numTimestampPerDevice = 10; @Before public void setUp() throws Exception { @@ -141,14 +142,19 @@ public class TableViewTest { } } - private void testWrite(TableSchema tableSchema) throws Exception { - final File testFile = new File(testDir, "testFile"); - try (TsFileWriter writer = new TsFileWriter(testFile)) { + public static void writeTsFile(TableSchema tableSchema, File file) + throws IOException, WriteProcessException { + try (TsFileWriter writer = new TsFileWriter(file)) { writer.setGenerateTableSchema(true); writer.registerTableSchema(tableSchema); writer.writeTable(genTablet(tableSchema, 0, 100)); } + } + + public static void testWrite(TableSchema tableSchema) throws Exception { + final File testFile = new File(testDir, "testFile"); + writeTsFile(tableSchema, testFile); TsFileSequenceReader sequenceReader = new TsFileSequenceReader(testFile.getAbsolutePath()); TableQueryExecutor tableQueryExecutor = @@ -476,7 +482,7 @@ public class TableViewTest { } } - private Tablet genTablet(TableSchema tableSchema, int offset, int deviceNum) { + public static Tablet genTablet(TableSchema tableSchema, int offset, int deviceNum) { Tablet tablet = new Tablet( tableSchema.getTableName(), @@ -491,9 +497,7 @@ public class TableViewTest { for (int j = 0; j < columnSchemas.size(); j++) { IMeasurementSchema columnSchema = columnSchemas.get(j); tablet.addValue( - columnSchema.getMeasurementId(), - rowIndex, - getValue(columnSchema.getType(), i, tableSchema.getColumnTypes().get(j))); + columnSchema.getMeasurementId(), rowIndex, getValue(columnSchema.getType(), i)); } } } @@ -501,7 +505,7 @@ public class TableViewTest { return tablet; } - public Object getValue(TSDataType dataType, int i, ColumnType columnType) { + public static Object getValue(TSDataType dataType, int i) { switch (dataType) { case INT64: return (long) i; @@ -512,7 +516,7 @@ public class TableViewTest { } } - private TableSchema genTableSchema(int tableNum) { + public static TableSchema genTableSchema(int tableNum) { List<IMeasurementSchema> measurementSchemas = new ArrayList<>(); List<ColumnType> columnTypes = new ArrayList<>();
