This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch snapshot/2.1.0-250521
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/snapshot/2.1.0-250521 by this
push:
new 547ea881 add ignore blob
547ea881 is described below
commit 547ea881bfbdc6f98bfe9bbcefae95d1572d084c
Author: Tian Jiang <[email protected]>
AuthorDate: Thu May 22 18:00:04 2025 +0800
add ignore blob
(cherry picked from commit 487688d6b745d6c5f18d6ae7adcb0747c66241f5)
---
.../tsfile/read/reader/TsFileLastReader.java | 66 +++++++++++++---------
.../tsfile/read/reader/TsFileLastReaderTest.java | 49 ++++++++++++----
2 files changed, 78 insertions(+), 37 deletions(-)
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java
index 8c01215c..f89b30df 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java
@@ -58,6 +58,8 @@ public class TsFileLastReader
private final TsFileSequenceReader sequenceReader;
private boolean asyncIO = true;
+ // when true, blob series will return a null TimeValuePair
+ private boolean ignoreBlob = false;
private Iterator<Pair<IDeviceID, List<TimeseriesMetadata>>>
timeseriesMetadataIter;
private Pair<IDeviceID, List<Pair<String, TimeValuePair>>> nextValue;
@@ -68,9 +70,16 @@ public class TsFileLastReader
sequenceReader = new TsFileSequenceReader(filePath);
}
- public TsFileLastReader(String filePath, boolean asyncIO) throws IOException
{
+ /**
+ * @param filePath path of the TsFile
+ * @param asyncIO use asynchronous IO or not
+ * @param ignoreBlob whether to ignore series with blob type (the returned
TimeValuePair will be
+ * null)
+ */
+ public TsFileLastReader(String filePath, boolean asyncIO, boolean
ignoreBlob) throws IOException {
this(filePath);
this.asyncIO = asyncIO;
+ this.ignoreBlob = ignoreBlob;
}
@Override
@@ -224,39 +233,44 @@ public class TsFileLastReader
: TsPrimitiveType.getByType(
seriesMeta.getTsDataType(),
seriesMeta.getStatistics().getLastValue())));
} else {
- ChunkMetadata lastNonEmptyChunkMetadata = null;
- for (int i = seriesMeta.getChunkMetadataList().size() - 1; i >= 0; i--) {
- ChunkMetadata chunkMetadata = (ChunkMetadata)
seriesMeta.getChunkMetadataList().get(i);
- if (chunkMetadata.getStatistics() == null ||
chunkMetadata.getStatistics().getCount() > 0) {
- // the chunk of a single chunk series must not be empty
- lastNonEmptyChunkMetadata = chunkMetadata;
- break;
- }
- }
+ return readLastPoint(seriesMeta, isAligned);
+ }
+ }
+
+ private Pair<String, TimeValuePair> readLastPoint(
+ TimeseriesMetadata seriesMeta, boolean isAligned) throws IOException {
+ if (seriesMeta.getChunkMetadataList() == null) {
+ return new Pair<>(seriesMeta.getMeasurementId(), null);
+ }
- if (lastNonEmptyChunkMetadata == null) {
- LOGGER.error(
- "All chunks are empty in series {} of file {}",
- seriesMeta,
- sequenceReader.getFileName());
- return new Pair<>(seriesMeta.getMeasurementId(), null);
+ ChunkMetadata lastNonEmptyChunkMetadata = null;
+ for (int i = seriesMeta.getChunkMetadataList().size() - 1; i >= 0; i--) {
+ ChunkMetadata chunkMetadata = (ChunkMetadata)
seriesMeta.getChunkMetadataList().get(i);
+ if (chunkMetadata.getStatistics() == null ||
chunkMetadata.getStatistics().getCount() > 0) {
+ // the chunk of a single chunk series must not be empty
+ lastNonEmptyChunkMetadata = chunkMetadata;
+ break;
}
+ }
- Chunk chunk = sequenceReader.readMemChunk(lastNonEmptyChunkMetadata);
+ if (lastNonEmptyChunkMetadata == null) {
+ return new Pair<>(seriesMeta.getMeasurementId(), null);
+ }
- if (!isAligned) {
- return new Pair<>(seriesMeta.getMeasurementId(),
readNonAlignedLastPoint(chunk));
- } else {
- return new Pair<>(
- seriesMeta.getMeasurementId(),
- readAlignedLastPoint(
- chunk, lastNonEmptyChunkMetadata,
seriesMeta.getStatistics().getEndTime()));
- }
+ Chunk chunk = sequenceReader.readMemChunk(lastNonEmptyChunkMetadata);
+
+ if (!isAligned) {
+ return new Pair<>(seriesMeta.getMeasurementId(),
readNonAlignedLastPoint(chunk));
+ } else {
+ return new Pair<>(
+ seriesMeta.getMeasurementId(),
+ readAlignedLastPoint(
+ chunk, lastNonEmptyChunkMetadata,
seriesMeta.getStatistics().getEndTime()));
}
}
private void init() throws IOException {
- timeseriesMetadataIter = sequenceReader.iterAllTimeseriesMetadata(false,
true);
+ timeseriesMetadataIter = sequenceReader.iterAllTimeseriesMetadata(false,
!ignoreBlob);
if (asyncIO) {
int queueCapacity = 1024;
lastValueQueue = new ArrayBlockingQueue<>(queueCapacity);
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java
b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java
index 2f490f67..40b1cc8f 100644
---
a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java
+++
b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java
@@ -51,6 +51,8 @@ import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
@SuppressWarnings({"ResultOfMethodCallIgnored", "SameParameterValue"})
public class TsFileLastReaderTest {
@@ -165,7 +167,7 @@ public class TsFileLastReaderTest {
throws Exception {
long startTime = System.currentTimeMillis();
Set<IDeviceID> devices = new HashSet<>();
- try (TsFileLastReader lastReader = new TsFileLastReader(filePath, true)) {
+ try (TsFileLastReader lastReader = new TsFileLastReader(filePath, true,
false)) {
while (lastReader.hasNext()) {
Set<String> measurements = new HashSet<>();
Pair<IDeviceID, List<Pair<String, TimeValuePair>>> next =
lastReader.next();
@@ -211,10 +213,11 @@ public class TsFileLastReaderTest {
System.out.printf("Last point iteration takes %dms%n",
System.currentTimeMillis() - startTime);
}
- private void doReadLast(int deviceNum, int measurementNum, int
seriesPointNum) throws Exception {
+ private void doReadLast(int deviceNum, int measurementNum, int
seriesPointNum, boolean ignoreBlob)
+ throws Exception {
long startTime = System.currentTimeMillis();
Set<IDeviceID> devices = new HashSet<>();
- try (TsFileLastReader lastReader = new TsFileLastReader(filePath, true)) {
+ try (TsFileLastReader lastReader = new TsFileLastReader(filePath, true,
ignoreBlob)) {
while (lastReader.hasNext()) {
Set<String> measurements = new HashSet<>();
Pair<IDeviceID, List<Pair<String, TimeValuePair>>> next =
lastReader.next();
@@ -226,14 +229,31 @@ public class TsFileLastReaderTest {
next.right.forEach(
pair -> {
measurements.add(pair.getLeft());
+ // the time column is regarded as the first half
+ int measurementIndex =
+ pair.left.isEmpty() ? -1 :
Integer.parseInt(pair.getLeft().substring(1));
+ TSDataType tsDataType =
+ measurementIndex == -1
+ ? TSDataType.INT64
+ : dataTypes.get(measurementIndex % dataTypes.size());
+
+ if (tsDataType == TSDataType.BLOB && ignoreBlob) {
+ assertNull(pair.getRight());
+ return;
+ }
+
assertEquals(seriesPointNum - 1, pair.getRight().getTimestamp());
- TsPrimitiveType value = pair.getRight().getValue();
- if (value.getDataType() == TSDataType.INT64) {
- assertEquals(seriesPointNum - 1, value.getLong());
+ if (pair.getRight() == null) {
+ assertTrue(ignoreBlob);
} else {
- assertEquals(
- new Binary(Long.toBinaryString(seriesPointNum - 1),
StandardCharsets.UTF_8),
- value.getBinary());
+ TsPrimitiveType value = pair.getRight().getValue();
+ if (value.getDataType() == TSDataType.INT64) {
+ assertEquals(seriesPointNum - 1, value.getLong());
+ } else {
+ assertEquals(
+ new Binary(Long.toBinaryString(seriesPointNum - 1),
StandardCharsets.UTF_8),
+ value.getBinary());
+ }
}
});
assertEquals(measurementNum + 1, measurements.size());
@@ -246,7 +266,7 @@ public class TsFileLastReaderTest {
private void testReadLast(int deviceNum, int measurementNum, int
seriesPointNum)
throws Exception {
createFile(deviceNum, measurementNum, seriesPointNum);
- doReadLast(deviceNum, measurementNum, seriesPointNum);
+ doReadLast(deviceNum, measurementNum, seriesPointNum, false);
file.delete();
}
@@ -319,6 +339,13 @@ public class TsFileLastReaderTest {
}
}
+ @Test
+ public void testIgnoreBlob() throws Exception {
+ createFile(10, 10, 10);
+ doReadLast(10, 10, 10, true);
+ file.delete();
+ }
+
@Ignore("Performance")
@Test
public void testManyRead() throws Exception {
@@ -327,7 +354,7 @@ public class TsFileLastReaderTest {
int seriesPointNum = 1;
createFile(deviceNum, measurementNum, seriesPointNum);
for (int i = 0; i < 10; i++) {
- doReadLast(deviceNum, measurementNum, seriesPointNum);
+ doReadLast(deviceNum, measurementNum, seriesPointNum, false);
}
file.delete();
}