This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 4927dd86 Add some methods for compaction
4927dd86 is described below
commit 4927dd86e8380f3c5dfaee2d2bae78ea5bb5cea6
Author: shuwenwei <[email protected]>
AuthorDate: Tue May 28 10:39:18 2024 +0800
Add some methods for compaction
---
.../apache/tsfile/read/TsFileSequenceReader.java | 42 +++++++++++++++++-----
1 file changed, 34 insertions(+), 8 deletions(-)
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
index 4c8ce31d..7107242e 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
@@ -1335,6 +1335,8 @@ public class TsFileSequenceReader implements
AutoCloseable {
} else {
return metadataIndex.getChildIndexEntry(deviceID, exactSearch);
}
+ } catch (StopReadTsFileByInterruptException e) {
+ throw e;
} catch (Exception e) {
logger.error("Something error happened while deserializing MetadataIndex
of file {}", file);
throw e;
@@ -2177,7 +2179,6 @@ public class TsFileSequenceReader implements
AutoCloseable {
}
ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(),
metadataIndexPair.right);
MetadataIndexNode metadataIndexNode;
- TimeseriesMetadata firstTimeseriesMetadata;
try {
// next layer MeasurementNode of the specific DeviceNode
metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer, false);
@@ -2185,7 +2186,19 @@ public class TsFileSequenceReader implements
AutoCloseable {
logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file);
throw e;
}
- firstTimeseriesMetadata =
tryToGetFirstTimeseriesMetadata(metadataIndexNode);
+ return getAlignedChunkMetadataByMetadataIndexNode(device,
metadataIndexNode);
+ }
+
+ /**
+ * Get AlignedChunkMetadata of sensors under one device. Notice: if all the
value chunks is empty
+ * chunk, then return empty list.
+ *
+ * @param device device name
+ * @param metadataIndexNode the first measurement metadata index node of the
device
+ */
+ public List<AlignedChunkMetadata> getAlignedChunkMetadataByMetadataIndexNode(
+ IDeviceID device, MetadataIndexNode metadataIndexNode) throws
IOException {
+ TimeseriesMetadata firstTimeseriesMetadata =
tryToGetFirstTimeseriesMetadata(metadataIndexNode);
if (firstTimeseriesMetadata == null) {
throw new IOException("Timeseries of device {" + device + "} are not
aligned");
}
@@ -2199,7 +2212,7 @@ public class TsFileSequenceReader implements
AutoCloseable {
if (i != metadataIndexEntryList.size() - 1) {
endOffset = metadataIndexEntryList.get(i + 1).getOffset();
}
- buffer = readData(metadataIndexEntry.getOffset(), endOffset);
+ ByteBuffer buffer = readData(metadataIndexEntry.getOffset(), endOffset);
if
(metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT))
{
List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
while (buffer.hasRemaining()) {
@@ -2419,9 +2432,22 @@ public class TsFileSequenceReader implements
AutoCloseable {
};
}
- Queue<Pair<Long, Long>> queue = new LinkedList<>();
ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(),
metadataIndexPair.right);
- collectEachLeafMeasurementNodeOffsetRange(buffer, queue);
+ MetadataIndexNode firstMeasurementNode =
MetadataIndexNode.deserializeFrom(buffer, false);
+ return getMeasurementChunkMetadataListMapIterator(firstMeasurementNode);
+ }
+
+ /**
+ * @return An iterator of linked hashmaps ( measurement -> chunk metadata
list ). When traversing
+ * the linked hashmap, you will get chunk metadata lists according to
the lexicographic order
+ * of the measurements. The first measurement of the linked hashmap of
each iteration is
+ * always larger than the last measurement of the linked hashmap of the
previous iteration in
+ * lexicographic order.
+ */
+ public Iterator<Map<String, List<ChunkMetadata>>>
getMeasurementChunkMetadataListMapIterator(
+ MetadataIndexNode firstMeasurementMetadataIndexNodeOfDevice) throws
IOException {
+ Queue<Pair<Long, Long>> queue = new LinkedList<>();
+
collectEachLeafMeasurementNodeOffsetRange(firstMeasurementMetadataIndexNodeOfDevice,
queue);
return new Iterator<Map<String, List<ChunkMetadata>>>() {
@@ -2462,9 +2488,8 @@ public class TsFileSequenceReader implements
AutoCloseable {
}
private void collectEachLeafMeasurementNodeOffsetRange(
- ByteBuffer buffer, Queue<Pair<Long, Long>> queue) throws IOException {
+ MetadataIndexNode metadataIndexNode, Queue<Pair<Long, Long>> queue)
throws IOException {
try {
- final MetadataIndexNode metadataIndexNode =
MetadataIndexNode.deserializeFrom(buffer, false);
final MetadataIndexNodeType metadataIndexNodeType =
metadataIndexNode.getNodeType();
final int metadataIndexListSize = metadataIndexNode.getChildren().size();
for (int i = 0; i < metadataIndexListSize; ++i) {
@@ -2477,7 +2502,8 @@ public class TsFileSequenceReader implements
AutoCloseable {
queue.add(new Pair<>(startOffset, endOffset));
continue;
}
- collectEachLeafMeasurementNodeOffsetRange(readData(startOffset,
endOffset), queue);
+ collectEachLeafMeasurementNodeOffsetRange(
+ MetadataIndexNode.deserializeFrom(readData(startOffset,
endOffset), false), queue);
}
} catch (StopReadTsFileByInterruptException e) {
throw e;