This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 89666aba Fix TsFileDeviceIterator (#222)
89666aba is described below
commit 89666aba473f9712067c0ef4b0c91e880507efea
Author: shuwenwei <[email protected]>
AuthorDate: Mon Sep 2 10:45:10 2024 +0800
Fix TsFileDeviceIterator (#222)
* fix TsFileDeviceIterator
* add ut
* fix review
---
.../apache/tsfile/read/TsFileDeviceIterator.java | 173 ++++++++++++++++++---
.../apache/tsfile/read/TsFileSequenceReader.java | 111 +------------
.../tsfile/read/TsFileDeviceIteratorTest.java | 141 +++++++++++++++++
3 files changed, 291 insertions(+), 134 deletions(-)
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileDeviceIterator.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileDeviceIterator.java
index 232f3052..3454f5dd 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileDeviceIterator.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileDeviceIterator.java
@@ -19,35 +19,45 @@
package org.apache.tsfile.read;
+import org.apache.tsfile.compatibility.DeserializeConfig;
+import org.apache.tsfile.exception.StopReadTsFileByInterruptException;
import org.apache.tsfile.exception.TsFileRuntimeException;
+import org.apache.tsfile.file.IMetadataIndexEntry;
+import org.apache.tsfile.file.metadata.DeviceMetadataIndexEntry;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.tsfile.file.metadata.enums.MetadataIndexNodeType;
import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
public class TsFileDeviceIterator implements Iterator<Pair<IDeviceID,
Boolean>> {
- private final TsFileSequenceReader reader;
- // device -> firstMeasurmentNode offset
- private final Queue<Pair<IDeviceID, long[]>> queue;
+ private final TsFileSequenceReader reader;
+ private final DeserializeConfig deserializeConfig;
+ private final Iterator<MetadataIndexNode> tableMetadataIndexNodeIterator;
+ private final Queue<Pair<IDeviceID, long[]>> queue = new LinkedList<>();
+ private final List<long[]> leafDeviceNodeOffsetList = new LinkedList<>();
private Pair<IDeviceID, Boolean> currentDevice = null;
private MetadataIndexNode measurementNode;
- // <startOffset, endOffset>, device leaf node offset in this file
- private final List<long[]> leafDeviceNodeOffsetList;
+ private static final Logger logger =
LoggerFactory.getLogger(TsFileDeviceIterator.class);
- public TsFileDeviceIterator(
- TsFileSequenceReader reader,
- List<long[]> leafDeviceNodeOffsetList,
- Queue<Pair<IDeviceID, long[]>> queue) {
+ public TsFileDeviceIterator(TsFileSequenceReader reader) throws IOException {
this.reader = reader;
- this.queue = queue;
- this.leafDeviceNodeOffsetList = leafDeviceNodeOffsetList;
+ this.deserializeConfig = reader.getDeserializeContext();
+ this.tableMetadataIndexNodeIterator =
+
reader.readFileMetadata().getTableMetadataIndexNodeMap().values().iterator();
}
public Pair<IDeviceID, Boolean> current() {
@@ -56,21 +66,38 @@ public class TsFileDeviceIterator implements
Iterator<Pair<IDeviceID, Boolean>>
@Override
public boolean hasNext() {
- if (!queue.isEmpty()) {
- return true;
- } else if (leafDeviceNodeOffsetList.isEmpty()) {
- // device queue is empty and all device leaf node has been read
- return false;
- } else {
- // queue is empty but there are still some devices on leaf node not
being read yet
- long[] nextDeviceLeafNodeOffset = leafDeviceNodeOffsetList.remove(0);
- try {
- reader.getDevicesAndEntriesOfOneLeafNode(
+ try {
+ prepareNextTable();
+ if (!queue.isEmpty()) {
+ return true;
+ } else if (leafDeviceNodeOffsetList.isEmpty()) {
+ // device queue is empty and all device leaf node has been read
+ return false;
+ } else {
+ // queue is empty but there are still some devices on leaf node not
being read yet
+ long[] nextDeviceLeafNodeOffset = leafDeviceNodeOffsetList.remove(0);
+ getDevicesAndEntriesOfOneLeafNode(
nextDeviceLeafNodeOffset[0], nextDeviceLeafNodeOffset[1], queue);
- } catch (IOException e) {
- throw new RuntimeException(e);
+ return true;
}
- return true;
+ } catch (IOException e) {
+ throw new TsFileRuntimeException(e);
+ }
+ }
+
+ private void prepareNextTable() throws IOException {
+ if (!queue.isEmpty() || !leafDeviceNodeOffsetList.isEmpty()) {
+ return;
+ }
+ if (!tableMetadataIndexNodeIterator.hasNext()) {
+ return;
+ }
+ MetadataIndexNode nextTableMetadataIndexNode =
tableMetadataIndexNodeIterator.next();
+
+ if
(nextTableMetadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE))
{
+ getDevicesOfLeafNode(nextTableMetadataIndexNode, queue);
+ } else {
+ getAllDeviceLeafNodeOffset(nextTableMetadataIndexNode,
leafDeviceNodeOffsetList);
}
}
@@ -96,4 +123,102 @@ public class TsFileDeviceIterator implements
Iterator<Pair<IDeviceID, Boolean>>
public MetadataIndexNode getFirstMeasurementNodeOfCurrentDevice() {
return measurementNode;
}
+
+ /**
+ * Get devices and first measurement node offset.
+ *
+ * @param startOffset start offset of device leaf node
+ * @param endOffset end offset of device leaf node
+ * @param measurementNodeOffsetQueue device -> first measurement node offset
+ */
+ public void getDevicesAndEntriesOfOneLeafNode(
+ Long startOffset, Long endOffset, Queue<Pair<IDeviceID, long[]>>
measurementNodeOffsetQueue)
+ throws IOException {
+ try {
+ ByteBuffer nextBuffer = reader.readData(startOffset, endOffset);
+ MetadataIndexNode deviceLeafNode =
+
deserializeConfig.deviceMetadataIndexNodeBufferDeserializer.deserialize(
+ nextBuffer, deserializeConfig);
+ getDevicesOfLeafNode(deviceLeafNode, measurementNodeOffsetQueue);
+ } catch (StopReadTsFileByInterruptException e) {
+ throw e;
+ } catch (Exception e) {
+ logger.error(
+ "Something error happened while getting all devices of file {}",
reader.getFileName());
+ throw e;
+ }
+ }
+
+ /**
+ * Get all devices and its corresponding entries on the specific device leaf
node.
+ *
+ * @param deviceLeafNode this node must be device leaf node
+ */
+ private void getDevicesOfLeafNode(
+ MetadataIndexNode deviceLeafNode, Queue<Pair<IDeviceID, long[]>>
measurementNodeOffsetQueue) {
+ if
(!deviceLeafNode.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) {
+ throw new IllegalStateException("the first param should be device leaf
node.");
+ }
+ List<IMetadataIndexEntry> childrenEntries = deviceLeafNode.getChildren();
+ for (int i = 0; i < childrenEntries.size(); i++) {
+ IMetadataIndexEntry deviceEntry = childrenEntries.get(i);
+ long childStartOffset = deviceEntry.getOffset();
+ long childEndOffset =
+ i == childrenEntries.size() - 1
+ ? deviceLeafNode.getEndOffset()
+ : childrenEntries.get(i + 1).getOffset();
+ long[] offset = {childStartOffset, childEndOffset};
+ measurementNodeOffsetQueue.add(
+ new Pair<>(((DeviceMetadataIndexEntry) deviceEntry).getDeviceID(),
offset));
+ }
+ }
+
+ /**
+ * Get the device leaf node offset under the specific device internal node.
+ *
+ * @param deviceInternalNode this node must be device internal node
+ */
+ private void getAllDeviceLeafNodeOffset(
+ MetadataIndexNode deviceInternalNode, List<long[]>
leafDeviceNodeOffsets) throws IOException {
+ if
(!deviceInternalNode.getNodeType().equals(MetadataIndexNodeType.INTERNAL_DEVICE))
{
+ throw new IllegalStateException("the first param should be device
internal node.");
+ }
+ try {
+ int metadataIndexListSize = deviceInternalNode.getChildren().size();
+ boolean isCurrentLayerLeafNode = false;
+ for (int i = 0; i < metadataIndexListSize; i++) {
+ IMetadataIndexEntry entry = deviceInternalNode.getChildren().get(i);
+ long startOffset = entry.getOffset();
+ long endOffset = deviceInternalNode.getEndOffset();
+ if (i != metadataIndexListSize - 1) {
+ endOffset = deviceInternalNode.getChildren().get(i + 1).getOffset();
+ }
+ if (i == 0) {
+ // check is current layer device leaf node or device internal node.
Just need to check the
+ // first entry, because the rest are the same
+ MetadataIndexNodeType nodeType =
+ MetadataIndexNodeType.deserialize(
+ ReadWriteIOUtils.readByte(reader.readData(endOffset - 1,
endOffset)));
+ isCurrentLayerLeafNode =
nodeType.equals(MetadataIndexNodeType.LEAF_DEVICE);
+ }
+ if (isCurrentLayerLeafNode) {
+ // is device leaf node
+ long[] offset = {startOffset, endOffset};
+ leafDeviceNodeOffsets.add(offset);
+ continue;
+ }
+ ByteBuffer nextBuffer = reader.readData(startOffset, endOffset);
+ getAllDeviceLeafNodeOffset(
+
deserializeConfig.deviceMetadataIndexNodeBufferDeserializer.deserialize(
+ nextBuffer, deserializeConfig),
+ leafDeviceNodeOffsets);
+ }
+ } catch (StopReadTsFileByInterruptException e) {
+ throw e;
+ } catch (Exception e) {
+ logger.error(
+ "Something error happened while getting all devices of file {}",
reader.getFileName());
+ throw e;
+ }
+ }
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
index a69d7872..57c2500d 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
@@ -880,116 +880,7 @@ public class TsFileSequenceReader implements
AutoCloseable {
* on one device leaf node each time to save memory.
*/
public TsFileDeviceIterator getAllDevicesIteratorWithIsAligned() throws
IOException {
- readFileMetadata();
- Queue<Pair<IDeviceID, long[]>> queue = new LinkedList<>();
- List<long[]> leafDeviceNodeOffsets = new ArrayList<>();
- for (MetadataIndexNode metadataIndexNode :
- tsFileMetaData.getTableMetadataIndexNodeMap().values()) {
- if
(metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) {
- // the first node of index tree is device leaf node, then get the
devices directly
- getDevicesOfLeafNode(metadataIndexNode, queue);
- } else {
- // get all device leaf node offset
- getAllDeviceLeafNodeOffset(metadataIndexNode, leafDeviceNodeOffsets);
- }
- }
- return new TsFileDeviceIterator(this, leafDeviceNodeOffsets, queue);
- }
-
- /**
- * Get devices and first measurement node offset.
- *
- * @param startOffset start offset of device leaf node
- * @param endOffset end offset of device leaf node
- * @param measurementNodeOffsetQueue device -> first measurement node offset
- */
- public void getDevicesAndEntriesOfOneLeafNode(
- Long startOffset, Long endOffset, Queue<Pair<IDeviceID, long[]>>
measurementNodeOffsetQueue)
- throws IOException {
- try {
- ByteBuffer nextBuffer = readData(startOffset, endOffset);
- MetadataIndexNode deviceLeafNode =
-
deserializeConfig.deviceMetadataIndexNodeBufferDeserializer.deserialize(
- nextBuffer, deserializeConfig);
- getDevicesOfLeafNode(deviceLeafNode, measurementNodeOffsetQueue);
- } catch (StopReadTsFileByInterruptException e) {
- throw e;
- } catch (Exception e) {
- logger.error("Something error happened while getting all devices of file
{}", file);
- throw e;
- }
- }
-
- /**
- * Get all devices and its corresponding entries on the specific device leaf
node.
- *
- * @param deviceLeafNode this node must be device leaf node
- */
- private void getDevicesOfLeafNode(
- MetadataIndexNode deviceLeafNode, Queue<Pair<IDeviceID, long[]>>
measurementNodeOffsetQueue) {
- if
(!deviceLeafNode.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) {
- throw new IllegalStateException("the first param should be device leaf
node.");
- }
- List<IMetadataIndexEntry> childrenEntries = deviceLeafNode.getChildren();
- for (int i = 0; i < childrenEntries.size(); i++) {
- IMetadataIndexEntry deviceEntry = childrenEntries.get(i);
- long childStartOffset = deviceEntry.getOffset();
- long childEndOffset =
- i == childrenEntries.size() - 1
- ? deviceLeafNode.getEndOffset()
- : childrenEntries.get(i + 1).getOffset();
- long[] offset = {childStartOffset, childEndOffset};
- measurementNodeOffsetQueue.add(
- new Pair<>(((DeviceMetadataIndexEntry) deviceEntry).getDeviceID(),
offset));
- }
- }
-
- /**
- * Get the device leaf node offset under the specific device internal node.
- *
- * @param deviceInternalNode this node must be device internal node
- */
- private void getAllDeviceLeafNodeOffset(
- MetadataIndexNode deviceInternalNode, List<long[]>
leafDeviceNodeOffsets) throws IOException {
- if
(!deviceInternalNode.getNodeType().equals(MetadataIndexNodeType.INTERNAL_DEVICE))
{
- throw new IllegalStateException("the first param should be device
internal node.");
- }
- try {
- int metadataIndexListSize = deviceInternalNode.getChildren().size();
- boolean isCurrentLayerLeafNode = false;
- for (int i = 0; i < metadataIndexListSize; i++) {
- IMetadataIndexEntry entry = deviceInternalNode.getChildren().get(i);
- long startOffset = entry.getOffset();
- long endOffset = deviceInternalNode.getEndOffset();
- if (i != metadataIndexListSize - 1) {
- endOffset = deviceInternalNode.getChildren().get(i + 1).getOffset();
- }
- if (i == 0) {
- // check is current layer device leaf node or device internal node.
Just need to check the
- // first entry, because the rest are the same
- MetadataIndexNodeType nodeType =
- MetadataIndexNodeType.deserialize(
- ReadWriteIOUtils.readByte(readData(endOffset - 1,
endOffset)));
- isCurrentLayerLeafNode =
nodeType.equals(MetadataIndexNodeType.LEAF_DEVICE);
- }
- if (isCurrentLayerLeafNode) {
- // is device leaf node
- long[] offset = {startOffset, endOffset};
- leafDeviceNodeOffsets.add(offset);
- continue;
- }
- ByteBuffer nextBuffer = readData(startOffset, endOffset);
- getAllDeviceLeafNodeOffset(
-
deserializeConfig.deviceMetadataIndexNodeBufferDeserializer.deserialize(
- nextBuffer, deserializeConfig),
- leafDeviceNodeOffsets);
- }
- } catch (StopReadTsFileByInterruptException e) {
- throw e;
- } catch (Exception e) {
- logger.error("Something error happened while getting all devices of file
{}", file);
- throw e;
- }
+ return new TsFileDeviceIterator(this);
}
/**
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileDeviceIteratorTest.java
b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileDeviceIteratorTest.java
new file mode 100644
index 00000000..6290d476
--- /dev/null
+++
b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileDeviceIteratorTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.read;
+
+import org.apache.tsfile.constant.TestConstant;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class TsFileDeviceIteratorTest {
+ private static final String FILE_PATH =
+ TestConstant.BASE_OUTPUT_PATH.concat("TsFileDeviceIterator.tsfile");
+
+ @After
+ public void teardown() {
+ new File(FILE_PATH).delete();
+ }
+
+ @Test
+ public void test() throws IOException {
+ int totalDeviceNum = 0;
+ try (TsFileIOWriter writer = new TsFileIOWriter(new File(FILE_PATH))) {
+ for (int i = 1; i <= 10; i++) {
+ String tableName = "table" + i;
+ registerTableSchema(writer, tableName);
+ int deviceNum = i;
+ if (i % 2 == 0) {
+ deviceNum *= 10000;
+ } else {
+ deviceNum *= 10;
+ }
+ totalDeviceNum += deviceNum;
+ generateDevice(writer, tableName, deviceNum);
+ }
+ writer.endFile();
+ }
+ int deviceFromIterator = 0;
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH)) {
+ TsFileDeviceIterator deviceIterator =
reader.getAllDevicesIteratorWithIsAligned();
+ IDeviceID previous = null;
+ while (deviceIterator.hasNext()) {
+ Pair<IDeviceID, Boolean> next = deviceIterator.next();
+ deviceFromIterator++;
+ if (previous != null) {
+ Assert.assertTrue(previous.compareTo(next.getLeft()) < 0);
+ }
+ previous = next.getLeft();
+ }
+ }
+ Assert.assertEquals(totalDeviceNum, deviceFromIterator);
+ }
+
+ private void registerTableSchema(TsFileIOWriter writer, String tableName) {
+ List<IMeasurementSchema> schemas =
+ Arrays.asList(
+ new MeasurementSchema(
+ "id", TSDataType.TEXT, TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED),
+ new MeasurementSchema("s1", TSDataType.INT64),
+ new MeasurementSchema("s2", TSDataType.INT64),
+ new MeasurementSchema("s3", TSDataType.INT64),
+ new MeasurementSchema("s4", TSDataType.INT64));
+ List<Tablet.ColumnType> columnTypes =
+ Arrays.asList(
+ Tablet.ColumnType.ID,
+ Tablet.ColumnType.MEASUREMENT,
+ Tablet.ColumnType.MEASUREMENT,
+ Tablet.ColumnType.MEASUREMENT,
+ Tablet.ColumnType.MEASUREMENT);
+ TableSchema tableSchema = new TableSchema(tableName, schemas, columnTypes);
+ writer.getSchema().registerTableSchema(tableSchema);
+ }
+
+ private void generateDevice(TsFileIOWriter writer, String tableName, int
deviceNum)
+ throws IOException {
+ for (int i = 0; i < deviceNum; i++) {
+ IDeviceID deviceID =
+ IDeviceID.Factory.DEFAULT_FACTORY.create(new String[] {tableName,
"d" + i});
+ writer.startChunkGroup(deviceID);
+ generateSimpleAlignedSeriesToCurrentDevice(
+ writer, Arrays.asList("s1", "s2", "s3", "s4"), new TimeRange[] {new
TimeRange(10, 20)});
+ writer.endChunkGroup();
+ }
+ }
+
+ public void generateSimpleAlignedSeriesToCurrentDevice(
+ TsFileIOWriter writer, List<String> measurementNames, TimeRange[]
toGenerateChunkTimeRanges)
+ throws IOException {
+ List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
+ for (String measurementName : measurementNames) {
+ measurementSchemas.add(
+ new MeasurementSchema(
+ measurementName, TSDataType.INT64, TSEncoding.RLE,
CompressionType.LZ4));
+ }
+ for (TimeRange toGenerateChunk : toGenerateChunkTimeRanges) {
+ AlignedChunkWriterImpl alignedChunkWriter = new
AlignedChunkWriterImpl(measurementSchemas);
+ for (long time = toGenerateChunk.getMin(); time <=
toGenerateChunk.getMax(); time++) {
+ alignedChunkWriter.getTimeChunkWriter().write(time);
+ for (int i = 0; i < measurementNames.size(); i++) {
+ alignedChunkWriter.getValueChunkWriterByIndex(i).write(time, time,
false);
+ }
+ }
+ alignedChunkWriter.writeToFileWriter(writer);
+ }
+ }
+}