This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.1
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/dev/1.1 by this push:
new 3a1f0c8f [To dev/1.1] Add TsFileLastReader for retrieving last points
in a TsFile (#506)
3a1f0c8f is described below
commit 3a1f0c8f617964902dd7142efb36db36b4fc6795
Author: Jiang Tian <[email protected]>
AuthorDate: Tue Jun 3 16:59:48 2025 +0800
[To dev/1.1] Add TsFileLastReader for retrieving last points in a TsFile
(#506)
* add last redaer
* spotless
---
.../tsfile/file/metadata/TimeseriesMetadata.java | 31 +-
.../file/metadata/statistics/TimeStatistics.java | 8 +-
.../apache/tsfile/read/TsFileSequenceReader.java | 192 ++++++++++-
.../java/org/apache/tsfile/read/common/Chunk.java | 4 +
.../tsfile/read/reader/TsFileLastReader.java | 313 +++++++++++++++++
.../tsfile/read/reader/page/ValuePageReader.java | 4 +
.../tsfile/read/reader/TsFileLastReaderTest.java | 371 +++++++++++++++++++++
7 files changed, 911 insertions(+), 12 deletions(-)
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TimeseriesMetadata.java
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TimeseriesMetadata.java
index 17a326e0..c7f97a90 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TimeseriesMetadata.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TimeseriesMetadata.java
@@ -114,6 +114,11 @@ public class TimeseriesMetadata implements
ITimeSeriesMetadata {
}
public static TimeseriesMetadata deserializeFrom(ByteBuffer buffer, boolean
needChunkMetadata) {
+ return deserializeFrom(buffer, needChunkMetadata, needChunkMetadata);
+ }
+
+ public static TimeseriesMetadata deserializeFrom(
+ ByteBuffer buffer, boolean needChunkMetadataForNonBlob, boolean
needChunkMetadataForBlob) {
TimeseriesMetadata timeseriesMetaData = new TimeseriesMetadata();
timeseriesMetaData.setTimeSeriesMetadataType(ReadWriteIOUtils.readByte(buffer));
timeseriesMetaData.setMeasurementId(ReadWriteIOUtils.readVarIntString(buffer));
@@ -121,7 +126,8 @@ public class TimeseriesMetadata implements
ITimeSeriesMetadata {
int chunkMetaDataListDataSize =
ReadWriteForEncodingUtils.readUnsignedVarInt(buffer);
timeseriesMetaData.setDataSizeOfChunkMetaDataList(chunkMetaDataListDataSize);
timeseriesMetaData.setStatistics(Statistics.deserialize(buffer,
timeseriesMetaData.dataType));
- if (needChunkMetadata) {
+ if ((timeseriesMetaData.getTsDataType() != TSDataType.BLOB &&
needChunkMetadataForNonBlob)
+ || (timeseriesMetaData.getTsDataType() == TSDataType.BLOB &&
needChunkMetadataForBlob)) {
ByteBuffer byteBuffer = buffer.slice();
byteBuffer.limit(chunkMetaDataListDataSize);
timeseriesMetaData.chunkMetadataList = new ArrayList<>();
@@ -138,6 +144,14 @@ public class TimeseriesMetadata implements
ITimeSeriesMetadata {
public static TimeseriesMetadata deserializeFrom(
TsFileInput tsFileInput, boolean needChunkMetadata) throws IOException {
+ return deserializeFrom(tsFileInput, needChunkMetadata, needChunkMetadata);
+ }
+
+ public static TimeseriesMetadata deserializeFrom(
+ TsFileInput tsFileInput,
+ boolean needChunkMetadataForNonBlob,
+ boolean needChunkMetadataForBlob)
+ throws IOException {
InputStream inputStream = tsFileInput.wrapAsInputStream();
TimeseriesMetadata timeseriesMetaData = new TimeseriesMetadata();
timeseriesMetaData.setTimeSeriesMetadataType(ReadWriteIOUtils.readByte(inputStream));
@@ -148,7 +162,8 @@ public class TimeseriesMetadata implements
ITimeSeriesMetadata {
timeseriesMetaData.setStatistics(
Statistics.deserialize(inputStream, timeseriesMetaData.dataType));
long startOffset = tsFileInput.position();
- if (needChunkMetadata) {
+ if ((timeseriesMetaData.getTsDataType() != TSDataType.BLOB &&
needChunkMetadataForNonBlob)
+ || (timeseriesMetaData.getTsDataType() == TSDataType.BLOB &&
needChunkMetadataForBlob)) {
timeseriesMetaData.chunkMetadataList = new ArrayList<>();
while (tsFileInput.position() < startOffset + chunkMetaDataListDataSize)
{
timeseriesMetaData.chunkMetadataList.add(
@@ -168,6 +183,14 @@ public class TimeseriesMetadata implements
ITimeSeriesMetadata {
*/
public static TimeseriesMetadata deserializeFrom(
ByteBuffer buffer, Set<String> excludedMeasurements, boolean
needChunkMetadata) {
+ return deserializeFrom(buffer, excludedMeasurements, needChunkMetadata,
needChunkMetadata);
+ }
+
+ public static TimeseriesMetadata deserializeFrom(
+ ByteBuffer buffer,
+ Set<String> excludedMeasurements,
+ boolean needChunkMetadataForNonBlob,
+ boolean needChunkMetadataForBlob) {
byte timeseriesType = ReadWriteIOUtils.readByte(buffer);
String measurementID = ReadWriteIOUtils.readVarIntString(buffer);
TSDataType tsDataType = ReadWriteIOUtils.readDataType(buffer);
@@ -181,7 +204,9 @@ public class TimeseriesMetadata implements
ITimeSeriesMetadata {
timeseriesMetaData.setDataSizeOfChunkMetaDataList(chunkMetaDataListDataSize);
timeseriesMetaData.setStatistics(statistics);
- if (!excludedMeasurements.contains(measurementID) && needChunkMetadata) {
+ if (!excludedMeasurements.contains(measurementID)
+ && ((tsDataType != TSDataType.BLOB && needChunkMetadataForNonBlob)
+ || (tsDataType == TSDataType.BLOB && needChunkMetadataForBlob))) {
// measurement is not in the excluded set and need chunk metadata
ByteBuffer byteBuffer = buffer.slice();
byteBuffer.limit(chunkMetaDataListDataSize);
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/TimeStatistics.java
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/TimeStatistics.java
index 48fcb329..96dcf79f 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/TimeStatistics.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/TimeStatistics.java
@@ -76,22 +76,22 @@ public class TimeStatistics extends Statistics<Long> {
@Override
public Long getMinValue() {
- throw new StatisticsClassException(String.format(STATS_UNSUPPORTED_MSG,
TIME, "min value"));
+ return getStartTime();
}
@Override
public Long getMaxValue() {
- throw new StatisticsClassException(String.format(STATS_UNSUPPORTED_MSG,
TIME, "max value"));
+ return getEndTime();
}
@Override
public Long getFirstValue() {
- throw new StatisticsClassException(String.format(STATS_UNSUPPORTED_MSG,
TIME, "first value"));
+ return getStartTime();
}
@Override
public Long getLastValue() {
- throw new StatisticsClassException(String.format(STATS_UNSUPPORTED_MSG,
TIME, "last value"));
+ return getEndTime();
}
@Override
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
index ff7c5006..13c7adb2 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
@@ -75,9 +75,11 @@ import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -85,6 +87,7 @@ import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Queue;
@@ -1252,11 +1255,32 @@ public class TsFileSequenceReader implements
AutoCloseable {
Map<IDeviceID, List<TimeseriesMetadata>> timeseriesMetadataMap,
boolean needChunkMetadata)
throws IOException {
+ generateMetadataIndex(
+ metadataIndex,
+ buffer,
+ deviceId,
+ type,
+ timeseriesMetadataMap,
+ needChunkMetadata,
+ needChunkMetadata);
+ }
+
+ private void generateMetadataIndex(
+ IMetadataIndexEntry metadataIndex,
+ ByteBuffer buffer,
+ IDeviceID deviceId,
+ MetadataIndexNodeType type,
+ Map<IDeviceID, List<TimeseriesMetadata>> timeseriesMetadataMap,
+ boolean needChunkMetadataForNonBlob,
+ boolean needChunkMetadataForBlob)
+ throws IOException {
try {
if (type.equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
while (buffer.hasRemaining()) {
-
timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer,
needChunkMetadata));
+ timeseriesMetadataList.add(
+ TimeseriesMetadata.deserializeFrom(
+ buffer, needChunkMetadataForNonBlob,
needChunkMetadataForBlob));
}
timeseriesMetadataMap
.computeIfAbsent(deviceId, k -> new ArrayList<>())
@@ -1284,7 +1308,8 @@ public class TsFileSequenceReader implements
AutoCloseable {
deviceId,
metadataIndexNode.getNodeType(),
timeseriesMetadataMap,
- needChunkMetadata);
+ needChunkMetadataForNonBlob,
+ needChunkMetadataForBlob);
} else {
// when the buffer length is over than Integer.MAX_VALUE,
// using tsFileInput to get timeseriesMetadataList
@@ -1295,7 +1320,8 @@ public class TsFileSequenceReader implements
AutoCloseable {
deviceId,
metadataIndexNode.getNodeType(),
timeseriesMetadataMap,
- needChunkMetadata);
+ needChunkMetadataForNonBlob,
+ needChunkMetadataForBlob);
}
}
}
@@ -1316,13 +1342,35 @@ public class TsFileSequenceReader implements
AutoCloseable {
Map<IDeviceID, List<TimeseriesMetadata>> timeseriesMetadataMap,
boolean needChunkMetadata)
throws IOException {
+ generateMetadataIndexUsingTsFileInput(
+ metadataIndex,
+ start,
+ end,
+ deviceId,
+ type,
+ timeseriesMetadataMap,
+ needChunkMetadata,
+ needChunkMetadata);
+ }
+
+ private void generateMetadataIndexUsingTsFileInput(
+ IMetadataIndexEntry metadataIndex,
+ long start,
+ long end,
+ IDeviceID deviceId,
+ MetadataIndexNodeType type,
+ Map<IDeviceID, List<TimeseriesMetadata>> timeseriesMetadataMap,
+ boolean needChunkMetadataForNonBlob,
+ boolean needChunkMetadataForBlob)
+ throws IOException {
try {
tsFileInput.position(start);
if (type.equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
while (tsFileInput.position() < end) {
timeseriesMetadataList.add(
- TimeseriesMetadata.deserializeFrom(tsFileInput,
needChunkMetadata));
+ TimeseriesMetadata.deserializeFrom(
+ tsFileInput, needChunkMetadataForNonBlob,
needChunkMetadataForBlob));
}
timeseriesMetadataMap
.computeIfAbsent(deviceId, k -> new ArrayList<>())
@@ -1349,7 +1397,8 @@ public class TsFileSequenceReader implements
AutoCloseable {
deviceId,
metadataIndexNode.getNodeType(),
timeseriesMetadataMap,
- needChunkMetadata);
+ needChunkMetadataForNonBlob,
+ needChunkMetadataForBlob);
}
}
} catch (StopReadTsFileByInterruptException e) {
@@ -1398,6 +1447,11 @@ public class TsFileSequenceReader implements
AutoCloseable {
return timeseriesMetadataMap;
}
+ public Iterator<Pair<IDeviceID, List<TimeseriesMetadata>>>
iterAllTimeseriesMetadata(
+ boolean needChunkMetadataForNonBlob, boolean needChunkMetadataForBlob)
throws IOException {
+ return new TimeseriesMetadataIterator(needChunkMetadataForNonBlob,
needChunkMetadataForBlob);
+ }
+
/* This method will only deserialize the TimeseriesMetadata, not including
chunk metadata list */
private List<TimeseriesMetadata>
getDeviceTimeseriesMetadataWithoutChunkMetadata(IDeviceID device)
throws IOException {
@@ -2771,4 +2825,132 @@ public class TsFileSequenceReader implements
AutoCloseable {
public int hashCode() {
return Objects.hash(file);
}
+
+ private class TimeseriesMetadataIterator
+ implements Iterator<Pair<IDeviceID, List<TimeseriesMetadata>>> {
+
+ private final Deque<MetadataIndexNode> nodeStack = new ArrayDeque<>();
+ private final boolean needChunkMetadataForNonBlob;
+ private final boolean needCHunkMetadataForBlob;
+ private Pair<IDeviceID, List<TimeseriesMetadata>> nextValue;
+ private MetadataIndexNode currentLeafDeviceNode;
+ private int currentLeafDeviceNodeIndex;
+
+ public TimeseriesMetadataIterator(
+ boolean needChunkMetadataForNonBlob, boolean needChunkMetadataForBlob)
throws IOException {
+ this.needChunkMetadataForNonBlob = needChunkMetadataForNonBlob;
+ this.needCHunkMetadataForBlob = needChunkMetadataForBlob;
+ if (tsFileMetaData == null) {
+ readFileMetadata();
+ }
+
+ nodeStack.add(tsFileMetaData.getMetadataIndex());
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (nextValue != null) {
+ return true;
+ }
+
+ try {
+ loadNextValue();
+ } catch (IOException e) {
+ logger.warn("Cannot read timeseries metadata from {},", file, e);
+ return false;
+ }
+ return nextValue != null;
+ }
+
+ @Override
+ public Pair<IDeviceID, List<TimeseriesMetadata>> next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ Pair<IDeviceID, List<TimeseriesMetadata>> ret = nextValue;
+ nextValue = null;
+ return ret;
+ }
+
+ private void loadNextLeafDeviceNode() throws IOException {
+ while (!nodeStack.isEmpty()) {
+ MetadataIndexNode node = nodeStack.pop();
+ MetadataIndexNodeType nodeType = node.getNodeType();
+ if (nodeType.equals(MetadataIndexNodeType.LEAF_DEVICE)) {
+ currentLeafDeviceNode = node;
+ currentLeafDeviceNodeIndex = 0;
+ return;
+ }
+
+ List<IMetadataIndexEntry> childrenIndex = node.getChildren();
+ for (int i = 0; i < childrenIndex.size(); i++) {
+ long endOffset;
+ IMetadataIndexEntry childIndex = childrenIndex.get(i);
+ endOffset = node.getEndOffset();
+ if (i != childrenIndex.size() - 1) {
+ endOffset = childrenIndex.get(i + 1).getOffset();
+ }
+
+ MetadataIndexNode child;
+ if (endOffset - childIndex.getOffset() < Integer.MAX_VALUE) {
+ ByteBuffer buffer = readData(childIndex.getOffset(), endOffset);
+ child = MetadataIndexNode.deserializeFrom(buffer, true);
+ } else {
+ tsFileInput.position(childIndex.getOffset());
+ child =
MetadataIndexNode.deserializeFrom(tsFileInput.wrapAsInputStream(), true);
+ }
+ nodeStack.push(child);
+ }
+ }
+ }
+
+ private void loadNextValue() throws IOException {
+ if (currentLeafDeviceNode == null
+ || currentLeafDeviceNodeIndex >=
currentLeafDeviceNode.getChildren().size()) {
+ currentLeafDeviceNode = null;
+ loadNextLeafDeviceNode();
+ }
+ if (currentLeafDeviceNode == null) {
+ return;
+ }
+
+ IMetadataIndexEntry childIndex =
+ currentLeafDeviceNode.getChildren().get(currentLeafDeviceNodeIndex);
+ int childNum = currentLeafDeviceNode.getChildren().size();
+ IDeviceID deviceId = ((DeviceMetadataIndexEntry)
childIndex).getDeviceID();
+
+ Map<IDeviceID, List<TimeseriesMetadata>> nextValueMap = new HashMap<>(1);
+ long endOffset = currentLeafDeviceNode.getEndOffset();
+ if (currentLeafDeviceNodeIndex != childNum - 1) {
+ endOffset =
+ currentLeafDeviceNode.getChildren().get(currentLeafDeviceNodeIndex
+ 1).getOffset();
+ }
+ if (endOffset - childIndex.getOffset() < Integer.MAX_VALUE) {
+ ByteBuffer nextBuffer = readData(childIndex.getOffset(), endOffset);
+ generateMetadataIndex(
+ childIndex,
+ nextBuffer,
+ deviceId,
+ currentLeafDeviceNode.getNodeType(),
+ nextValueMap,
+ needChunkMetadataForNonBlob,
+ needCHunkMetadataForBlob);
+ } else {
+ // when the buffer length is over than Integer.MAX_VALUE,
+ // using tsFileInput to get timeseriesMetadataList
+ generateMetadataIndexUsingTsFileInput(
+ childIndex,
+ childIndex.getOffset(),
+ endOffset,
+ deviceId,
+ currentLeafDeviceNode.getNodeType(),
+ nextValueMap,
+ needChunkMetadataForNonBlob,
+ needCHunkMetadataForBlob);
+ }
+ currentLeafDeviceNodeIndex++;
+ Entry<IDeviceID, List<TimeseriesMetadata>> entry =
nextValueMap.entrySet().iterator().next();
+ nextValue = new Pair<>(entry.getKey(), entry.getValue());
+ }
+ }
}
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java
index 5caab7fe..fdeb4ec3 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java
@@ -171,4 +171,8 @@ public class Chunk {
public long getRetainedSizeInBytes() {
return INSTANCE_SIZE + sizeOfByteArray(chunkData.capacity());
}
+
+ public boolean isSinglePageChunk() {
+ return (getHeader().getChunkType() & 0x3F) ==
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER;
+ }
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java
new file mode 100644
index 00000000..f89b30df
--- /dev/null
+++
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.read.reader;
+
+import org.apache.tsfile.compress.IUnCompressor;
+import org.apache.tsfile.encoding.decoder.Decoder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.header.PageHeader;
+import org.apache.tsfile.file.metadata.ChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.BatchData;
+import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.tsfile.read.reader.page.ValuePageReader;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.TsPrimitiveType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
+
+/** Conveniently retrieve last points of all timeseries from a TsFile. */
+public class TsFileLastReader
+ implements AutoCloseable, Iterator<Pair<IDeviceID, List<Pair<String,
TimeValuePair>>>> {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TsFileLastReader.class);
+
+ private final TsFileSequenceReader sequenceReader;
+ private boolean asyncIO = true;
+ // when true, blob series will return a null TimeValuePair
+ private boolean ignoreBlob = false;
+ private Iterator<Pair<IDeviceID, List<TimeseriesMetadata>>>
timeseriesMetadataIter;
+ private Pair<IDeviceID, List<Pair<String, TimeValuePair>>> nextValue;
+
+ private BlockingQueue<Pair<IDeviceID, List<Pair<String, TimeValuePair>>>>
lastValueQueue;
+ private ForkJoinTask<Void> asyncTask;
+
+ public TsFileLastReader(String filePath) throws IOException {
+ sequenceReader = new TsFileSequenceReader(filePath);
+ }
+
+ /**
+ * @param filePath path of the TsFile
+ * @param asyncIO use asynchronous IO or not
+ * @param ignoreBlob whether to ignore series with blob type (the returned
TimeValuePair will be
+ * null)
+ */
+ public TsFileLastReader(String filePath, boolean asyncIO, boolean
ignoreBlob) throws IOException {
+ this(filePath);
+ this.asyncIO = asyncIO;
+ this.ignoreBlob = ignoreBlob;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (timeseriesMetadataIter == null) {
+ try {
+ init();
+ } catch (IOException e) {
+ LOGGER.error("Cannot read timeseries metadata from {}",
sequenceReader.getFileName(), e);
+ return false;
+ }
+ }
+
+ // already meet the terminator
+ if (nextValue != null) {
+ return nextValue.getLeft() != null;
+ }
+
+ if (asyncIO) {
+ return hasNextAsync();
+ } else {
+ return hasNextSync();
+ }
+ }
+
+ private boolean hasNextSync() {
+ if (!timeseriesMetadataIter.hasNext()) {
+ nextValue = new Pair<>(null, null);
+ } else {
+ Pair<IDeviceID, List<TimeseriesMetadata>> next =
timeseriesMetadataIter.next();
+ try {
+ nextValue = new Pair<>(next.left, convertToLastPoints(next.right));
+ } catch (IOException e) {
+ LOGGER.error("Cannot read timeseries metadata from {}",
sequenceReader.getFileName(), e);
+ return false;
+ }
+ }
+ return nextValue.left != null;
+ }
+
+ private boolean hasNextAsync() {
+ try {
+ nextValue = lastValueQueue.take();
+ if (nextValue.getLeft() == null) {
+ // the terminator
+ return false;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ return nextValue.left != null;
+ }
+
+ /**
+ * @return (deviceId, measurementId, lastPoint)
+ */
+ @Override
+ public Pair<IDeviceID, List<Pair<String, TimeValuePair>>> next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ Pair<IDeviceID, List<Pair<String, TimeValuePair>>> ret = nextValue;
+ nextValue = null;
+ return ret;
+ }
+
+ private List<Pair<String, TimeValuePair>> convertToLastPoints(
+ List<TimeseriesMetadata> timeseriesMetadataList) throws IOException {
+ boolean isAligned = timeseriesMetadataList.get(0).getTsDataType() ==
TSDataType.VECTOR;
+ List<Pair<String, TimeValuePair>> list = new ArrayList<>();
+ for (TimeseriesMetadata meta : timeseriesMetadataList) {
+ Pair<String, TimeValuePair> stringTimeValuePairPair =
convertToLastPoint(meta, isAligned);
+ list.add(stringTimeValuePairPair);
+ }
+ return list;
+ }
+
+ private TimeValuePair readNonAlignedLastPoint(Chunk chunk) throws
IOException {
+ ChunkReader chunkReader = new ChunkReader(chunk);
+ BatchData batchData = null;
+ while (chunkReader.hasNextSatisfiedPage()) {
+ batchData = chunkReader.nextPageData();
+ }
+ if (batchData != null) {
+ return batchData.getLastPairBeforeOrEqualTimestamp(Long.MAX_VALUE);
+ } else {
+ return null;
+ }
+ }
+
+ private TimeValuePair readAlignedLastPoint(Chunk chunk, ChunkMetadata
chunkMetadata, long endTime)
+ throws IOException {
+ ByteBuffer chunkData = chunk.getData();
+ PageHeader lastPageHeader = null;
+ ByteBuffer lastPageData = null;
+ while (chunkData.hasRemaining()) {
+ PageHeader pageHeader;
+ if (chunk.isSinglePageChunk()) {
+ pageHeader = PageHeader.deserializeFrom(chunkData,
chunkMetadata.getStatistics());
+ } else {
+ pageHeader = PageHeader.deserializeFrom(chunkData, TSDataType.BLOB);
+ }
+ ByteBuffer pageData = chunkData.slice();
+ pageData.limit(pageData.position() + pageHeader.getCompressedSize());
+ chunkData.position(chunkData.position() +
pageHeader.getCompressedSize());
+
+ if ((pageHeader.getStatistics() == null &&
pageHeader.getUncompressedSize() != 0)
+ || (pageHeader.getStatistics() != null &&
pageHeader.getStatistics().getCount() > 0)) {
+ lastPageHeader = pageHeader;
+ lastPageData = pageData;
+ }
+ }
+
+ if (lastPageHeader != null) {
+ CompressionType compressionType = chunk.getHeader().getCompressionType();
+ if (compressionType != CompressionType.UNCOMPRESSED) {
+ ByteBuffer uncompressedPage =
ByteBuffer.allocate(lastPageHeader.getUncompressedSize());
+
IUnCompressor.getUnCompressor(compressionType).uncompress(lastPageData,
uncompressedPage);
+ lastPageData = uncompressedPage;
+ lastPageData.flip();
+ }
+
+ ValuePageReader valuePageReader =
+ new ValuePageReader(
+ lastPageHeader,
+ lastPageData,
+ TSDataType.BLOB,
+ Decoder.getDecoderByType(chunk.getHeader().getEncodingType(),
TSDataType.BLOB));
+ TsPrimitiveType lastValue = null;
+ for (int i = 0; i < valuePageReader.getSize(); i++) {
+ // the timestamp here is not necessary
+ lastValue = valuePageReader.nextValue(0, i);
+ }
+ return new TimeValuePair(endTime, lastValue);
+ } else {
+ return null;
+ }
+ }
+
+ private Pair<String, TimeValuePair> convertToLastPoint(
+ TimeseriesMetadata seriesMeta, boolean isAligned) throws IOException {
+ if (seriesMeta.getTsDataType() != TSDataType.BLOB) {
+ return new Pair<>(
+ seriesMeta.getMeasurementId(),
+ new TimeValuePair(
+ seriesMeta.getStatistics().getEndTime(),
+ seriesMeta.getTsDataType() == TSDataType.VECTOR
+ ? TsPrimitiveType.getByType(
+ TSDataType.INT64,
seriesMeta.getStatistics().getEndTime())
+ : TsPrimitiveType.getByType(
+ seriesMeta.getTsDataType(),
seriesMeta.getStatistics().getLastValue())));
+ } else {
+ return readLastPoint(seriesMeta, isAligned);
+ }
+ }
+
+ private Pair<String, TimeValuePair> readLastPoint(
+ TimeseriesMetadata seriesMeta, boolean isAligned) throws IOException {
+ if (seriesMeta.getChunkMetadataList() == null) {
+ return new Pair<>(seriesMeta.getMeasurementId(), null);
+ }
+
+ ChunkMetadata lastNonEmptyChunkMetadata = null;
+ for (int i = seriesMeta.getChunkMetadataList().size() - 1; i >= 0; i--) {
+ ChunkMetadata chunkMetadata = (ChunkMetadata)
seriesMeta.getChunkMetadataList().get(i);
+ if (chunkMetadata.getStatistics() == null ||
chunkMetadata.getStatistics().getCount() > 0) {
+ // the chunk of a single chunk series must not be empty
+ lastNonEmptyChunkMetadata = chunkMetadata;
+ break;
+ }
+ }
+
+ if (lastNonEmptyChunkMetadata == null) {
+ return new Pair<>(seriesMeta.getMeasurementId(), null);
+ }
+
+ Chunk chunk = sequenceReader.readMemChunk(lastNonEmptyChunkMetadata);
+
+ if (!isAligned) {
+ return new Pair<>(seriesMeta.getMeasurementId(),
readNonAlignedLastPoint(chunk));
+ } else {
+ return new Pair<>(
+ seriesMeta.getMeasurementId(),
+ readAlignedLastPoint(
+ chunk, lastNonEmptyChunkMetadata,
seriesMeta.getStatistics().getEndTime()));
+ }
+ }
+
+ private void init() throws IOException {
+ timeseriesMetadataIter = sequenceReader.iterAllTimeseriesMetadata(false,
!ignoreBlob);
+ if (asyncIO) {
+ int queueCapacity = 1024;
+ lastValueQueue = new ArrayBlockingQueue<>(queueCapacity);
+ asyncTask =
+ ForkJoinPool.commonPool()
+ .submit(
+ () -> {
+ try {
+ while (timeseriesMetadataIter.hasNext()) {
+ Pair<IDeviceID, List<TimeseriesMetadata>>
deviceSeriesMetadata =
+ timeseriesMetadataIter.next();
+ lastValueQueue.put(
+ new Pair<>(
+ deviceSeriesMetadata.left,
+
convertToLastPoints(deviceSeriesMetadata.right)));
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ LOGGER.error("Error while reading timeseries metadata",
e);
+ } finally {
+ try {
+ lastValueQueue.put(new Pair<>(null, null));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ return null;
+ });
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (asyncIO) {
+ asyncTask.cancel(true);
+ }
+ sequenceReader.close();
+ }
+}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java
index 58e9ff51..54e3e85f 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java
@@ -611,4 +611,8 @@ public class ValuePageReader {
uncompressDataIfNecessary();
return Arrays.copyOf(bitmap, bitmap.length);
}
+
+ public int getSize() {
+ return size;
+ }
}
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java
b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java
new file mode 100644
index 00000000..b9211a56
--- /dev/null
+++
b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.read.reader;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.PlainDeviceID;
+import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.read.common.Path;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.TsPrimitiveType;
+import org.apache.tsfile.write.TsFileWriter;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+@SuppressWarnings({"ResultOfMethodCallIgnored", "SameParameterValue"})
+public class TsFileLastReaderTest {
+
+ private static final List<TSDataType> dataTypes =
+ Arrays.asList(TSDataType.INT64, TSDataType.BLOB);
+
+ private final String filePath = "target/test.tsfile";
+ private final File file = new File(filePath);
+
+ private void createFile(int deviceNum, int measurementNum, int
seriesPointNum)
+ throws IOException, WriteProcessException {
+ try (TsFileWriter writer = new TsFileWriter(file)) {
+ List<MeasurementSchema> measurementSchemaList = new ArrayList<>();
+ for (int j = 0; j < measurementNum; j++) {
+ TSDataType tsDataType = dataTypes.get(j % dataTypes.size());
+ measurementSchemaList.add(new MeasurementSchema("s" + j, tsDataType));
+ }
+ for (int i = 0; i < deviceNum; i++) {
+ writer.registerAlignedTimeseries(new Path("device" + i),
measurementSchemaList);
+ }
+
+ for (int i = 0; i < deviceNum; i++) {
+ Tablet tablet = new Tablet("device" + i, measurementSchemaList,
seriesPointNum);
+ for (int k = 0; k < seriesPointNum; k++) {
+ tablet.addTimestamp(k, k);
+ }
+ tablet.rowSize = seriesPointNum;
+ for (int j = 0; j < measurementNum; j++) {
+ TSDataType tsDataType = dataTypes.get(j % dataTypes.size());
+ for (int k = 0; k < seriesPointNum; k++) {
+ switch (tsDataType) {
+ case INT64:
+ tablet.addValue("s" + j, k, (long) k);
+ break;
+ case BLOB:
+ tablet.addValue(
+ "s" + j, k, new Binary(Long.toBinaryString(k),
StandardCharsets.UTF_8));
+ break;
+ }
+ }
+ }
+ writer.writeAligned(tablet);
+ }
+ }
+ }
+
+ // the second half measurements will have an emtpy last chunk each
+ private void createFileWithLastEmptyChunks(int deviceNum, int
measurementNum, int seriesPointNum)
+ throws IOException, WriteProcessException {
+ try (TsFileWriter writer = new TsFileWriter(file)) {
+ List<MeasurementSchema> measurementSchemaList = new ArrayList<>();
+ for (int j = 0; j < measurementNum; j++) {
+ TSDataType tsDataType = dataTypes.get(j % dataTypes.size());
+ measurementSchemaList.add(new MeasurementSchema("s" + j, tsDataType));
+ }
+ for (int i = 0; i < deviceNum; i++) {
+ writer.registerAlignedTimeseries(new Path("device" + i),
measurementSchemaList);
+ }
+
+ // the first half seriesPointNum points are not null for all series
+ int batchPointNum = seriesPointNum / 2;
+ for (int i = 0; i < deviceNum; i++) {
+ Tablet tablet = new Tablet("device" + i, measurementSchemaList,
batchPointNum);
+ for (int k = 0; k < batchPointNum; k++) {
+ tablet.addTimestamp(k, k);
+ }
+ tablet.rowSize = batchPointNum;
+ for (int j = 0; j < measurementNum; j++) {
+ TSDataType tsDataType = dataTypes.get(j % dataTypes.size());
+ for (int k = 0; k < batchPointNum; k++) {
+ switch (tsDataType) {
+ case INT64:
+ tablet.addValue("s" + j, k, (long) k);
+ break;
+ case BLOB:
+ tablet.addValue(
+ "s" + j, k, new Binary(Long.toBinaryString(k),
StandardCharsets.UTF_8));
+ break;
+ }
+ }
+ }
+ writer.writeAligned(tablet);
+ }
+ writer.flushAllChunkGroups();
+
+ // the second half series have no value for the remaining points
+ batchPointNum = seriesPointNum - batchPointNum;
+ for (int i = 0; i < deviceNum; i++) {
+ Tablet tablet = new Tablet("device" + i, measurementSchemaList,
seriesPointNum);
+ for (int k = 0; k < batchPointNum; k++) {
+ tablet.addTimestamp(k, k + seriesPointNum / 2);
+ }
+ tablet.rowSize = batchPointNum;
+ for (int j = 0; j < measurementNum / 2; j++) {
+ TSDataType tsDataType = dataTypes.get(j % dataTypes.size());
+ for (int k = 0; k < seriesPointNum; k++) {
+ switch (tsDataType) {
+ case INT64:
+ tablet.addValue("s" + j, k, (long) k + seriesPointNum / 2);
+ break;
+ case BLOB:
+ tablet.addValue(
+ "s" + j,
+ k,
+ new Binary(
+ Long.toBinaryString(k + seriesPointNum / 2)
+ .getBytes(StandardCharsets.UTF_8)));
+ break;
+ }
+ }
+ }
+ for (int j = measurementNum / 2; j < measurementNum; j++) {
+ for (int k = 0; k < seriesPointNum; k++) {
+ tablet.addValue("s" + j, k, null);
+ }
+ }
+ writer.writeAligned(tablet);
+ }
+ }
+ }
+
+ private void doReadLastWithEmpty(int deviceNum, int measurementNum, int
seriesPointNum)
+ throws Exception {
+ long startTime = System.currentTimeMillis();
+ Set<IDeviceID> devices = new HashSet<>();
+ try (TsFileLastReader lastReader = new TsFileLastReader(filePath, true,
false)) {
+ while (lastReader.hasNext()) {
+ Set<String> measurements = new HashSet<>();
+ Pair<IDeviceID, List<Pair<String, TimeValuePair>>> next =
lastReader.next();
+ assertFalse(devices.contains(next.left));
+ devices.add(next.left);
+
+ // time column included
+ assertEquals(measurementNum + 1, next.getRight().size());
+ next.right.forEach(
+ pair -> {
+ measurements.add(pair.getLeft());
+ // the time column is regarded as the first half
+ int measurementIndex =
+ pair.left.isEmpty() ? -1 :
Integer.parseInt(pair.getLeft().substring(1));
+
+ if (measurementIndex < measurementNum / 2) {
+ assertEquals(seriesPointNum - 1,
pair.getRight().getTimestamp());
+ TsPrimitiveType value = pair.getRight().getValue();
+ if (value.getDataType() == TSDataType.INT64) {
+ assertEquals(seriesPointNum - 1, value.getLong());
+ } else {
+ assertEquals(
+ new Binary(Long.toBinaryString(seriesPointNum - 1),
StandardCharsets.UTF_8),
+ value.getBinary());
+ }
+ } else {
+ assertEquals(seriesPointNum / 2 - 1,
pair.getRight().getTimestamp());
+ TsPrimitiveType value = pair.getRight().getValue();
+ if (value.getDataType() == TSDataType.INT64) {
+ assertEquals(seriesPointNum / 2 - 1, value.getLong());
+ } else {
+ assertEquals(
+ new Binary(
+ Long.toBinaryString(seriesPointNum / 2 - 1),
StandardCharsets.UTF_8),
+ value.getBinary());
+ }
+ }
+ });
+ assertEquals(measurementNum + 1, measurements.size());
+ }
+ }
+ assertEquals(deviceNum, devices.size());
+ System.out.printf("Last point iteration takes %dms%n",
System.currentTimeMillis() - startTime);
+ }
+
+ private void doReadLast(int deviceNum, int measurementNum, int
seriesPointNum, boolean ignoreBlob)
+ throws Exception {
+ long startTime = System.currentTimeMillis();
+ Set<IDeviceID> devices = new HashSet<>();
+ try (TsFileLastReader lastReader = new TsFileLastReader(filePath, true,
ignoreBlob)) {
+ while (lastReader.hasNext()) {
+ Set<String> measurements = new HashSet<>();
+ Pair<IDeviceID, List<Pair<String, TimeValuePair>>> next =
lastReader.next();
+ assertFalse(devices.contains(next.left));
+ devices.add(next.left);
+
+ // time column included
+ assertEquals(measurementNum + 1, next.getRight().size());
+ next.right.forEach(
+ pair -> {
+ measurements.add(pair.getLeft());
+ // the time column is regarded as the first half
+ int measurementIndex =
+ pair.left.isEmpty() ? -1 :
Integer.parseInt(pair.getLeft().substring(1));
+ TSDataType tsDataType =
+ measurementIndex == -1
+ ? TSDataType.INT64
+ : dataTypes.get(measurementIndex % dataTypes.size());
+
+ if (tsDataType == TSDataType.BLOB && ignoreBlob) {
+ assertNull(pair.getRight());
+ return;
+ }
+
+ assertEquals(seriesPointNum - 1, pair.getRight().getTimestamp());
+ if (pair.getRight() == null) {
+ assertTrue(ignoreBlob);
+ } else {
+ TsPrimitiveType value = pair.getRight().getValue();
+ if (value.getDataType() == TSDataType.INT64) {
+ assertEquals(seriesPointNum - 1, value.getLong());
+ } else {
+ assertEquals(
+ new Binary(Long.toBinaryString(seriesPointNum - 1),
StandardCharsets.UTF_8),
+ value.getBinary());
+ }
+ }
+ });
+ assertEquals(measurementNum + 1, measurements.size());
+ }
+ }
+ assertEquals(deviceNum, devices.size());
+ System.out.printf("Last point iteration takes %dms%n",
System.currentTimeMillis() - startTime);
+ }
+
+ private void testReadLast(int deviceNum, int measurementNum, int
seriesPointNum)
+ throws Exception {
+ createFile(deviceNum, measurementNum, seriesPointNum);
+ doReadLast(deviceNum, measurementNum, seriesPointNum, false);
+ file.delete();
+ }
+
+ @Test
+ public void testSmall() throws Exception {
+ testReadLast(10, 10, 10);
+ }
+
+ @Test
+ public void testManyDevices() throws Exception {
+ testReadLast(10000, 10, 10);
+ }
+
+ @Test
+ public void testManyMeasurement() throws Exception {
+ testReadLast(10, 10000, 10);
+ }
+
+ @Test
+ public void testManyPoints() throws Exception {
+ testReadLast(100, 10, 10000);
+ }
+
+ @Test
+ public void testManyMany() throws Exception {
+ testReadLast(100, 100, 100);
+ }
+
+ @Test
+ public void testLastEmptyChunks() throws Exception {
+ createFileWithLastEmptyChunks(100, 100, 100);
+ doReadLastWithEmpty(100, 100, 100);
+ }
+
+ @Test
+ public void testLastEmptyPage() throws Exception {
+ try (TsFileIOWriter ioWriter = new TsFileIOWriter(file)) {
+ ioWriter.startChunkGroup(new PlainDeviceID("root.db1.d1"));
+ List<IMeasurementSchema> measurementSchemaList =
+ Arrays.asList(
+ new MeasurementSchema("s1", TSDataType.INT64),
+ new MeasurementSchema("s2", TSDataType.BLOB));
+ AlignedChunkWriterImpl alignedChunkWriter = new
AlignedChunkWriterImpl(measurementSchemaList);
+ alignedChunkWriter.write(
+ 0,
+ new TsPrimitiveType[] {
+ TsPrimitiveType.getByType(TSDataType.INT64, 0L),
+ TsPrimitiveType.getByType(
+ TSDataType.BLOB, new
Binary("0".getBytes(StandardCharsets.UTF_8)))
+ });
+ alignedChunkWriter.sealCurrentPage();
+ alignedChunkWriter.write(
+ 1, new TsPrimitiveType[]
{TsPrimitiveType.getByType(TSDataType.INT64, 1L), null});
+ alignedChunkWriter.writeToFileWriter(ioWriter);
+ ioWriter.endChunkGroup();
+
+ ioWriter.endFile();
+ }
+
+ try (TsFileLastReader lastReader = new TsFileLastReader(filePath)) {
+ Pair<IDeviceID, List<Pair<String, TimeValuePair>>> next =
lastReader.next();
+ assertEquals(new PlainDeviceID("root.db1.d1"), next.getLeft());
+ assertEquals(3, next.getRight().size());
+ assertEquals("s1", next.getRight().get(1).left);
+ assertEquals("s2", next.getRight().get(2).left);
+ assertEquals(1, next.getRight().get(1).right.getTimestamp());
+ assertEquals(1, next.getRight().get(1).right.getValue().getLong());
+ assertEquals(0, next.getRight().get(2).right.getTimestamp());
+ assertEquals("0",
next.getRight().get(2).right.getValue().getStringValue());
+ }
+ }
+
+ @Test
+ public void testIgnoreBlob() throws Exception {
+ createFile(10, 10, 10);
+ doReadLast(10, 10, 10, true);
+ file.delete();
+ }
+
+ @Ignore("Performance")
+ @Test
+ public void testManyRead() throws Exception {
+ int deviceNum = 10000;
+ int measurementNum = 1000;
+ int seriesPointNum = 1;
+ createFile(deviceNum, measurementNum, seriesPointNum);
+ for (int i = 0; i < 10; i++) {
+ doReadLast(deviceNum, measurementNum, seriesPointNum, false);
+ }
+ file.delete();
+ }
+}