This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch snapshot/2.1.0-250521
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/snapshot/2.1.0-250521 by this
push:
new 40105d17 fix last empty page
40105d17 is described below
commit 40105d171816b0473325aaae4040b8484fe9db86
Author: Tian Jiang <[email protected]>
AuthorDate: Thu May 22 11:22:36 2025 +0800
fix last empty page
---
.../tsfile/read/reader/TsFileLastReader.java | 17 ++++++---
.../tsfile/read/reader/TsFileLastReaderTest.java | 41 ++++++++++++++++++++++
2 files changed, 54 insertions(+), 4 deletions(-)
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java
index d84fff80..8c01215c 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java
@@ -168,14 +168,23 @@ public class TsFileLastReader
PageHeader lastPageHeader = null;
ByteBuffer lastPageData = null;
while (chunkData.hasRemaining()) {
+ PageHeader pageHeader;
if (chunk.isSinglePageChunk()) {
- lastPageHeader = PageHeader.deserializeFrom(chunkData,
chunkMetadata.getStatistics());
+ pageHeader = PageHeader.deserializeFrom(chunkData,
chunkMetadata.getStatistics());
} else {
- lastPageHeader = PageHeader.deserializeFrom(chunkData,
TSDataType.BLOB);
+ pageHeader = PageHeader.deserializeFrom(chunkData, TSDataType.BLOB);
+ }
+ ByteBuffer pageData = chunkData.slice();
+ pageData.limit(pageData.position() + pageHeader.getCompressedSize());
+ chunkData.position(chunkData.position() +
pageHeader.getCompressedSize());
+
+ if ((pageHeader.getStatistics() == null &&
pageHeader.getUncompressedSize() != 0)
+ || (pageHeader.getStatistics() != null &&
pageHeader.getStatistics().getCount() > 0)) {
+ lastPageHeader = pageHeader;
+ lastPageData = pageData;
}
- lastPageData = chunkData.slice();
- chunkData.position(chunkData.position() +
lastPageHeader.getCompressedSize());
}
+
if (lastPageHeader != null) {
CompressionType compressionType = chunk.getHeader().getCompressionType();
if (compressionType != CompressionType.UNCOMPRESSED) {
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java
b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java
index 806e77aa..2f490f67 100644
---
a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java
+++
b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java
@@ -22,15 +22,18 @@ package org.apache.tsfile.read.reader;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.IDeviceID.Factory;
import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.TsPrimitiveType;
import org.apache.tsfile.utils.WriteUtils.TabletAddValueFunction;
import org.apache.tsfile.write.TsFileWriter;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
import org.junit.Ignore;
import org.junit.Test;
@@ -278,6 +281,44 @@ public class TsFileLastReaderTest {
doReadLastWithEmpty(100, 100, 100);
}
+ @Test
+ public void testLastEmptyPage() throws Exception {
+ try (TsFileIOWriter ioWriter = new TsFileIOWriter(file)) {
+ ioWriter.startChunkGroup(Factory.DEFAULT_FACTORY.create("root.db1.d1"));
+ List<IMeasurementSchema> measurementSchemaList =
+ Arrays.asList(
+ new MeasurementSchema("s1", TSDataType.INT64),
+ new MeasurementSchema("s2", TSDataType.BLOB));
+ AlignedChunkWriterImpl alignedChunkWriter = new
AlignedChunkWriterImpl(measurementSchemaList);
+ alignedChunkWriter.write(
+ 0,
+ new TsPrimitiveType[] {
+ TsPrimitiveType.getByType(TSDataType.INT64, 0L),
+ TsPrimitiveType.getByType(
+ TSDataType.BLOB, new
Binary("0".getBytes(StandardCharsets.UTF_8)))
+ });
+ alignedChunkWriter.sealCurrentPage();
+ alignedChunkWriter.write(
+ 1, new TsPrimitiveType[]
{TsPrimitiveType.getByType(TSDataType.INT64, 1L), null});
+ alignedChunkWriter.writeToFileWriter(ioWriter);
+ ioWriter.endChunkGroup();
+
+ ioWriter.endFile();
+ }
+
+ try (TsFileLastReader lastReader = new TsFileLastReader(filePath)) {
+ Pair<IDeviceID, List<Pair<String, TimeValuePair>>> next =
lastReader.next();
+ assertEquals(Factory.DEFAULT_FACTORY.create("root.db1.d1"),
next.getLeft());
+ assertEquals(3, next.getRight().size());
+ assertEquals("s1", next.getRight().get(1).left);
+ assertEquals("s2", next.getRight().get(2).left);
+ assertEquals(1, next.getRight().get(1).right.getTimestamp());
+ assertEquals(1, next.getRight().get(1).right.getValue().getLong());
+ assertEquals(0, next.getRight().get(2).right.getTimestamp());
+ assertEquals("0",
next.getRight().get(2).right.getValue().getStringValue());
+ }
+ }
+
@Ignore("Performance")
@Test
public void testManyRead() throws Exception {