This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch snapshot/2.1.0-250521 in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit 8c136de8330737c689e15d67b583053f0fe6b59b Author: Tian Jiang <[email protected]> AuthorDate: Thu May 22 10:14:31 2025 +0800 fix series with empty chunk (cherry picked from commit a8af4b5aff45a15e45ab035f95b6e765d860fa2d) --- .../apache/tsfile/file/metadata/ChunkMetadata.java | 4 - .../tsfile/read/reader/TsFileLastReader.java | 26 ++++- .../tsfile/read/reader/TsFileLastReaderTest.java | 117 +++++++++++++++++++++ 3 files changed, 138 insertions(+), 9 deletions(-) diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java index 4be3b3ac..653a8991 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java @@ -430,8 +430,4 @@ public class ChunkMetadata implements IChunkMetadata { public MeasurementSchema toMeasurementSchema() { return new MeasurementSchema(measurementUid, tsDataType, encoding, compressionType); } - - public TSEncoding getEncoding() { - return encoding; - } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java index 0e829b23..d84fff80 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java @@ -215,17 +215,33 @@ public class TsFileLastReader : TsPrimitiveType.getByType( seriesMeta.getTsDataType(), seriesMeta.getStatistics().getLastValue()))); } else { - ChunkMetadata chunkMetadata = - (ChunkMetadata) - seriesMeta.getChunkMetadataList().get(seriesMeta.getChunkMetadataList().size() - 1); - Chunk chunk = sequenceReader.readMemChunk(chunkMetadata); + ChunkMetadata lastNonEmptyChunkMetadata = null; + for (int i = seriesMeta.getChunkMetadataList().size() - 1; i >= 0; i--) { + ChunkMetadata chunkMetadata = (ChunkMetadata) seriesMeta.getChunkMetadataList().get(i); + if (chunkMetadata.getStatistics() == null || chunkMetadata.getStatistics().getCount() > 0) { + // the chunk of a single chunk series must not be empty + lastNonEmptyChunkMetadata = chunkMetadata; + break; + } + } + + if (lastNonEmptyChunkMetadata == null) { + LOGGER.error( + "All chunks are empty in series {} of file {}", + seriesMeta, + sequenceReader.getFileName()); + return new Pair<>(seriesMeta.getMeasurementId(), null); + } + + Chunk chunk = sequenceReader.readMemChunk(lastNonEmptyChunkMetadata); if (!isAligned) { return new Pair<>(seriesMeta.getMeasurementId(), readNonAlignedLastPoint(chunk)); } else { return new Pair<>( seriesMeta.getMeasurementId(), - readAlignedLastPoint(chunk, chunkMetadata, seriesMeta.getStatistics().getEndTime())); + readAlignedLastPoint( + chunk, lastNonEmptyChunkMetadata, seriesMeta.getStatistics().getEndTime())); } } } diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java index ce3495aa..420b21b1 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java @@ -49,6 +49,7 @@ import java.util.Set; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +@SuppressWarnings({"ResultOfMethodCallIgnored", "SameParameterValue"}) public class TsFileLastReaderTest { private static final List<TSDataType> dataTypes = @@ -97,6 +98,116 @@ public class TsFileLastReaderTest { } } + // the second half measurements will have an emtpy last chunk each + private void createFileWithLastEmptyChunks(int deviceNum, int measurementNum, int seriesPointNum) + throws IOException, WriteProcessException { + try (TsFileWriter writer = new TsFileWriter(file)) { + List<IMeasurementSchema> measurementSchemaList = new ArrayList<>(); + for (int j = 0; j < measurementNum; j++) { + TSDataType tsDataType = dataTypes.get(j % dataTypes.size()); + measurementSchemaList.add(new MeasurementSchema("s" + j, tsDataType)); + } + for (int i = 0; i < deviceNum; i++) { + writer.registerAlignedTimeseries("device" + i, measurementSchemaList); + } + + // the first half seriesPointNum points are not null for all series + int batchPointNum = seriesPointNum / 2; + for (int i = 0; i < deviceNum; i++) { + Tablet tablet = new Tablet("device" + i, measurementSchemaList, batchPointNum); + for (int k = 0; k < batchPointNum; k++) { + tablet.addTimestamp(k, k); + } + for (int j = 0; j < measurementNum; j++) { + TSDataType tsDataType = dataTypes.get(j % dataTypes.size()); + for (int k = 0; k < batchPointNum; k++) { + typeAddValueFunctions.get(tsDataType).addValue(tablet, k, j); + } + } + writer.writeTree(tablet); + } + writer.flush(); + + // the second half series have no value for the remaining points + batchPointNum = seriesPointNum - batchPointNum; + for (int i = 0; i < deviceNum; i++) { + Tablet tablet = new Tablet("device" + i, measurementSchemaList, seriesPointNum); + for (int k = 0; k < batchPointNum; k++) { + tablet.addTimestamp(k, k + seriesPointNum / 2); + } + for (int j = 0; j < measurementNum / 2; j++) { + TSDataType tsDataType = dataTypes.get(j % dataTypes.size()); + for (int k = 0; k < seriesPointNum; k++) { + switch (tsDataType) { + case INT64: + tablet.addValue(k, j, (long) k + seriesPointNum / 2); + break; + case BLOB: + tablet.addValue( + k, + j, + Long.toBinaryString(k + seriesPointNum / 2).getBytes(StandardCharsets.UTF_8)); + break; + default: + throw new IllegalArgumentException("Unsupported TSDataType " + tsDataType); + } + } + } + writer.writeTree(tablet); + } + } + } + + private void doReadLastWithEmpty(int deviceNum, int measurementNum, int seriesPointNum) + throws Exception { + long startTime = System.currentTimeMillis(); + Set<IDeviceID> devices = new HashSet<>(); + try (TsFileLastReader lastReader = new TsFileLastReader(filePath, true)) { + while (lastReader.hasNext()) { + Set<String> measurements = new HashSet<>(); + Pair<IDeviceID, List<Pair<String, TimeValuePair>>> next = lastReader.next(); + assertFalse(devices.contains(next.left)); + devices.add(next.left); + + // time column included + assertEquals(measurementNum + 1, next.getRight().size()); + next.right.forEach( + pair -> { + measurements.add(pair.getLeft()); + // the time column is regarded as the first half + int measurementIndex = + pair.left.isEmpty() ? -1 : Integer.parseInt(pair.getLeft().substring(1)); + + if (measurementIndex < measurementNum / 2) { + assertEquals(seriesPointNum - 1, pair.getRight().getTimestamp()); + TsPrimitiveType value = pair.getRight().getValue(); + if (value.getDataType() == TSDataType.INT64) { + assertEquals(seriesPointNum - 1, value.getLong()); + } else { + assertEquals( + new Binary(Long.toBinaryString(seriesPointNum - 1), StandardCharsets.UTF_8), + value.getBinary()); + } + } else { + assertEquals(seriesPointNum / 2 - 1, pair.getRight().getTimestamp()); + TsPrimitiveType value = pair.getRight().getValue(); + if (value.getDataType() == TSDataType.INT64) { + assertEquals(seriesPointNum / 2 - 1, value.getLong()); + } else { + assertEquals( + new Binary( + Long.toBinaryString(seriesPointNum / 2 - 1), StandardCharsets.UTF_8), + value.getBinary()); + } + } + }); + assertEquals(measurementNum + 1, measurements.size()); + } + } + assertEquals(deviceNum, devices.size()); + System.out.printf("Last point iteration takes %dms%n", System.currentTimeMillis() - startTime); + } + private void doReadLast(int deviceNum, int measurementNum, int seriesPointNum) throws Exception { long startTime = System.currentTimeMillis(); Set<IDeviceID> devices = new HashSet<>(); @@ -161,6 +272,12 @@ public class TsFileLastReaderTest { testReadLast(1000, 1000, 1000); } + @Test + public void lastLastEmptyChunks() throws Exception { + createFileWithLastEmptyChunks(100, 100, 100); + doReadLastWithEmpty(100, 100, 100); + } + @Ignore("Performance") @Test public void testManyRead() throws Exception {
