This is an automated email from the ASF dual-hosted git repository. sunzesong pushed a commit to branch jira_2228 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 605bc4b9978f10fcd19ba33fb7d3970dbf9bc863 Author: Zesong Sun <[email protected]> AuthorDate: Thu Dec 30 15:07:38 2021 +0800 Add iterator to return device list in dictionary order --- .../iotdb/tsfile/read/TsFileSequenceReader.java | 82 ++++++++++++++++++---- .../iotdb/tsfile/read/GetAllDevicesTest.java | 2 +- .../apache/iotdb/tsfile/utils/FileGenerator.java | 32 ++++++++- .../tsfile/write/MetadataIndexConstructorTest.java | 25 +++---- 4 files changed, 107 insertions(+), 34 deletions(-) diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java index a8fd34a..48a04c5 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java @@ -591,8 +591,6 @@ public class TsFileSequenceReader implements AutoCloseable { private List<String> getAllDevices(MetadataIndexNode metadataIndexNode) throws IOException { List<String> deviceList = new ArrayList<>(); - int metadataIndexListSize = metadataIndexNode.getChildren().size(); - // if metadataIndexNode is LEAF_DEVICE, put all devices in node entry into the list if (metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) { deviceList.addAll( @@ -602,6 +600,7 @@ public class TsFileSequenceReader implements AutoCloseable { return deviceList; } + int metadataIndexListSize = metadataIndexNode.getChildren().size(); for (int i = 0; i < metadataIndexListSize; i++) { long endOffset = metadataIndexNode.getEndOffset(); if (i != metadataIndexListSize - 1) { @@ -609,21 +608,77 @@ public class TsFileSequenceReader implements AutoCloseable { } ByteBuffer buffer = readData(metadataIndexNode.getChildren().get(i).getOffset(), endOffset); MetadataIndexNode node = MetadataIndexNode.deserializeFrom(buffer); - if (node.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) { - // if node in next level is LEAF_DEVICE, put all devices in node entry into the list - deviceList.addAll( - node.getChildren().stream() - .map(MetadataIndexEntry::getName) - .collect(Collectors.toList())); - } else { - // keep traversing - deviceList.addAll(getAllDevices(node)); - } + deviceList.addAll(getAllDevices(node)); } return deviceList; } /** + * @return an iterator of "device, isAligned" list, in which names of devices are ordered in + * dictionary order, and isAligned represents whether the device is aligned + */ + public Iterator<Pair<String, Boolean>> getAllDevicesIteratorWithIsAligned() throws IOException { + readFileMetadata(); + + MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex(); + Queue<Pair<String, Pair<Long, Long>>> queue = new LinkedList<>(); + getAllDevicesWithIsAligned(metadataIndexNode, queue); + + return new Iterator<Pair<String, Boolean>>() { + @Override + public boolean hasNext() { + return !queue.isEmpty(); + } + + @Override + public Pair<String, Boolean> next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + Pair<String, Pair<Long, Long>> startEndPair = queue.remove(); + List<Pair<String, Boolean>> devices = new ArrayList<>(); + try { + MetadataIndexNode measurementNode = + MetadataIndexNode.deserializeFrom( + readData(startEndPair.right.left, startEndPair.right.right)); + // if tryToGetFirstTimeseriesMetadata(node) returns null, the device is not aligned + boolean isAligned = tryToGetFirstTimeseriesMetadata(measurementNode) != null; + return new Pair<>(startEndPair.left, isAligned); + } catch (IOException e) { + throw new TsFileRuntimeException( + "Error occurred while reading a time series metadata block."); + } + } + }; + } + + private void getAllDevicesWithIsAligned( + MetadataIndexNode metadataIndexNode, Queue<Pair<String, Pair<Long, Long>>> queue) + throws IOException { + try { + int metadataIndexListSize = metadataIndexNode.getChildren().size(); + + for (int i = 0; i < metadataIndexListSize; i++) { + MetadataIndexEntry entry = metadataIndexNode.getChildren().get(i); + long startOffset = entry.getOffset(); + long endOffset = metadataIndexNode.getEndOffset(); + if (i != metadataIndexListSize - 1) { + endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset(); + } + if (metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) { + queue.add(new Pair<>(entry.getName(), new Pair<>(startOffset, endOffset))); + continue; + } + ByteBuffer nextBuffer = readData(startOffset, endOffset); + getAllDevicesWithIsAligned(MetadataIndexNode.deserializeFrom(nextBuffer), queue); + } + } catch (BufferOverflowException e) { + logger.error("Something error happened while getting all devices of file {}", file); + throw e; + } + } + + /** * read all ChunkMetaDatas of given device * * @param device name @@ -773,8 +828,7 @@ public class TsFileSequenceReader implements AutoCloseable { readData( measurementNode.getChildren().get(0).getOffset(), measurementNode.getEndOffset()); } - TimeseriesMetadata firstTimeseriesMetadata = TimeseriesMetadata.deserializeFrom(buffer, true); - return firstTimeseriesMetadata; + return TimeseriesMetadata.deserializeFrom(buffer, true); } else if (measurementNode.getNodeType().equals(MetadataIndexNodeType.INTERNAL_MEASUREMENT)) { ByteBuffer buffer = readData( diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/GetAllDevicesTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/GetAllDevicesTest.java index 54fceae..a7a3cea 100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/GetAllDevicesTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/GetAllDevicesTest.java @@ -76,7 +76,7 @@ public class GetAllDevicesTest { List<String> devices = fileReader.getAllDevices(); Assert.assertEquals(deviceNum, devices.size()); for (int i = 0; i < deviceNum; i++) { - Assert.assertTrue(devices.contains("d" + i)); + Assert.assertEquals("d" + FileGenerator.generateIndexString(i, deviceNum), devices.get(i)); } FileGenerator.after(); diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/FileGenerator.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/FileGenerator.java index 517f252..c46dae9 100755 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/FileGenerator.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/FileGenerator.java @@ -223,7 +223,15 @@ public class FileGenerator { long startTime = 1480562618000L; for (int i = 0; i < deviceNum; i++) { for (int j = 0; j < measurementNum; j++) { - String d = "d" + i + "," + startTime + ",s" + j + "," + 1; + String d = + "d" + + generateIndexString(i, deviceNum) + + "," + + startTime + + ",s" + + generateIndexString(j, measurementNum) + + "," + + 1; fw.write(d + "\r\n"); } } @@ -294,9 +302,11 @@ public class FileGenerator { for (int i = 0; i < deviceNum; i++) { for (int j = 0; j < measurementNum; j++) { schema.registerTimeseries( - new Path("d" + i), + new Path("d" + generateIndexString(i, deviceNum)), new UnaryMeasurementSchema( - "s" + j, TSDataType.INT32, TSEncoding.valueOf(config.getValueEncoder()))); + "s" + generateIndexString(j, measurementNum), + TSDataType.INT32, + TSEncoding.valueOf(config.getValueEncoder()))); } } } @@ -331,4 +341,20 @@ public class FileGenerator { return null; } } + + /** + * generate curIndex string, use "0" on left to make sure align + * + * @param curIndex current index + * @param maxIndex max index + * @return curIndex's string + */ + public static String generateIndexString(int curIndex, int maxIndex) { + StringBuilder res = new StringBuilder(String.valueOf(curIndex)); + String target = String.valueOf(maxIndex); + while (res.length() < target.length()) { + res.insert(0, "0"); + } + return res.toString(); + } } diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java index cebf730..deca37d 100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java @@ -33,6 +33,7 @@ import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.utils.MeasurementGroup; +import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.record.TSRecord; import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; @@ -56,6 +57,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import static org.apache.iotdb.tsfile.utils.FileGenerator.generateIndexString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -223,6 +225,13 @@ public class MetadataIndexConstructorTest { } // 4.2 make sure timeseries in order try (TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH)) { + Iterator<Pair<String, Boolean>> iterator = reader.getAllDevicesIteratorWithIsAligned(); + while (iterator.hasNext()) { + for (String correctDevice : correctDevices) { + assertEquals(correctDevice, iterator.next().left); + } + } + Map<String, List<TimeseriesMetadata>> allTimeseriesMetadata = reader.getAllTimeseriesMetadata(); for (int j = 0; j < actualDevices.size(); j++) { @@ -481,20 +490,4 @@ public class MetadataIndexConstructorTest { fail(e.getMessage()); } } - - /** - * generate curIndex string, use "0" on left to make sure align - * - * @param curIndex current index - * @param maxIndex max index - * @return curIndex's string - */ - private String generateIndexString(int curIndex, int maxIndex) { - StringBuilder res = new StringBuilder(String.valueOf(curIndex)); - String target = String.valueOf(maxIndex); - while (res.length() < target.length()) { - res.insert(0, "0"); - } - return res.toString(); - } }
