This is an automated email from the ASF dual-hosted git repository. sunzesong pushed a commit to branch experimental/index in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d991bfdf254baad9be13632339d48cbbd36f23af Author: samperson1997 <[email protected]> AuthorDate: Tue Nov 2 10:35:26 2021 +0800 Experiment 1831 --- example/tsfile/pom.xml | 32 ++ .../java/org/apache/iotdb/tsfile/Constant.java | 5 +- .../apache/iotdb/tsfile/TsFileSequenceRead.java | 16 +- .../iotdb/tsfile/test1831/TsFileAggregation.java | 78 ++++ .../iotdb/tsfile/test1831/TsFileAggregationV2.java | 84 ++++ .../iotdb/tsfile/test1831/TsFileRawRead.java | 90 ++++ .../apache/iotdb/tsfile/test1831/TsFileReadV2.java | 89 ++++ .../apache/iotdb/tsfile/test1831/TsFileWrite.java | 106 +++++ .../iotdb/tsfile/test1831/TsFileWriteV2.java | 112 +++++ .../iotdb/tsfile/test1832/TsFileSketchTool.java | 510 +++++++++++++++++++++ .../apache/iotdb/tsfile/test1832/TsFileWrite.java | 72 +++ .../iotdb/tsfile/file/metadata/ChunkMetadata.java | 19 + .../file/metadata/MetadataIndexConstructorV2.java | 163 +++++++ .../tsfile/file/metadata/TimeseriesMetadataV2.java | 272 +++++++++++ .../iotdb/tsfile/read/TsFileSequenceReader.java | 147 ++++++ .../tsfile/read/controller/IMetadataQuerier.java | 2 + .../read/controller/MetadataQuerierByFileImpl.java | 66 +++ .../tsfile/read/query/executor/TsFileExecutor.java | 3 +- .../apache/iotdb/tsfile/write/TsFileWriter.java | 8 +- .../iotdb/tsfile/write/writer/TsFileIOWriter.java | 144 ++++++ 20 files changed, 2008 insertions(+), 10 deletions(-) diff --git a/example/tsfile/pom.xml b/example/tsfile/pom.xml index 13a53a9..ce21451 100644 --- a/example/tsfile/pom.xml +++ b/example/tsfile/pom.xml @@ -35,5 +35,37 @@ <artifactId>tsfile</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + </dependency> </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + <version>2.5.5</version> + <executions> + <execution> + <configuration> + <archive> + <manifest> + <mainClass>org.apache.iotdb.tsfile.test1831.TsFileAggregation</mainClass> + </manifest> + </archive> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/Constant.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/Constant.java index 09f4213..91e40d0 100644 --- a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/Constant.java +++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/Constant.java @@ -23,9 +23,10 @@ public class Constant { private Constant() {} - static final String SENSOR_1 = "sensor_1"; + public static final String SENSOR_ = "sensor_"; + public static final String SENSOR_1 = "sensor_1"; static final String SENSOR_2 = "sensor_2"; static final String SENSOR_3 = "sensor_3"; - static final String DEVICE_PREFIX = "device_"; + public static final String DEVICE_PREFIX = "device_"; } diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java index a51953b..788100b 100644 --- a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java +++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java @@ -46,7 +46,8 @@ public class TsFileSequenceRead { "squid:S106" }) // Suppress high Cognitive Complexity and Standard outputs warning public static void main(String[] args) throws IOException { - String filename = "test.tsfile"; + String filename = + "/Users/samperson1997/git/iotdb/data/data/sequence/root.sg/0/1832/test5.tsfile"; if (args.length >= 1) { filename = args[0]; } @@ -65,6 +66,7 @@ public class TsFileSequenceRead { System.out.println("[Chunk Group]"); System.out.println("position: " + reader.position()); byte marker; + int pageNum = 0; while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) { switch (marker) { case MetaMarker.CHUNK_HEADER: @@ -88,6 +90,7 @@ public class TsFileSequenceRead { while (dataSize > 0) { valueDecoder.reset(); System.out.println("\t\t[Page]\n \t\tPage head position: " + reader.position()); + pageNum++; PageHeader pageHeader = reader.readPageHeader( header.getDataType(), header.getChunkType() == MetaMarker.CHUNK_HEADER); @@ -105,11 +108,11 @@ public class TsFileSequenceRead { System.out.println("\t\tpoints in the page: " + batchData.length()); } while (batchData.hasCurrent()) { - System.out.println( - "\t\t\ttime, value: " - + batchData.currentTime() - + ", " - + batchData.currentValue()); + // System.out.println( + // "\t\t\ttime, value: " + // + batchData.currentTime() + // + ", " + // + batchData.currentValue()); batchData.next(); } dataSize -= pageHeader.getSerializedPageSize(); @@ -129,6 +132,7 @@ public class TsFileSequenceRead { MetaMarker.handleUnexpectedMarker(marker); } } + System.out.println("[Page Num]: " + pageNum); System.out.println("[Metadata]"); for (String device : reader.getAllDevices()) { Map<String, List<ChunkMetadata>> seriesMetaData = reader.readChunkMetadataInDevice(device); diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1831/TsFileAggregation.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1831/TsFileAggregation.java new file mode 100644 index 0000000..8e6bb4f --- /dev/null +++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1831/TsFileAggregation.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.test1831; + +import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Path; + +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; + +import java.io.IOException; + +public class TsFileAggregation { + + private static final String DEVICE1 = "device_"; + public static int chunkNum; + public static int fileNum = 500; + + public static void main(String[] args) throws IOException { + long costTime = 0L; + Options opts = new Options(); + Option chunkNumOption = + OptionBuilder.withArgName("args").withLongOpt("chunkNum").hasArg().create("c"); + opts.addOption(chunkNumOption); + + BasicParser parser = new BasicParser(); + CommandLine cl; + try { + cl = parser.parse(opts, args); + chunkNum = Integer.parseInt(cl.getOptionValue("c")); + } catch (Exception e) { + e.printStackTrace(); + } + + long totalStartTime = System.nanoTime(); + for (int fileIndex = 0; fileIndex < fileNum; fileIndex++) { + // file path + String path = + "/home/fit/szs/data/data/sequence/root.sg/0/" + + chunkNum + + "/test" + + fileIndex + + ".tsfile"; + + // aggregation query + try (TsFileSequenceReader reader = new TsFileSequenceReader(path)) { + Path seriesPath = new Path(DEVICE1, "sensor_1"); + long startTime = System.nanoTime(); + TimeseriesMetadata timeseriesMetadata = reader.readTimeseriesMetadata(seriesPath, false); + long count = timeseriesMetadata.getStatistics().getCount(); + costTime += (System.nanoTime() - startTime); + } + } + System.out.println( + "Total raw read cost time: " + (System.nanoTime() - totalStartTime) / 1000_000 + "ms"); + System.out.println("Index area cost time: " + costTime / 1000_000 + "ms"); + } +} diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1831/TsFileAggregationV2.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1831/TsFileAggregationV2.java new file mode 100644 index 0000000..e2bf123 --- /dev/null +++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1831/TsFileAggregationV2.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.test1831; + +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Path; + +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; + +import java.io.IOException; +import java.util.List; + +public class TsFileAggregationV2 { + + private static final String DEVICE1 = "device_"; + public static int chunkNum; + public static int fileNum = 500; + + public static void main(String[] args) throws IOException { + long costTime = 0L; + Options opts = new Options(); + Option chunkNumOption = + OptionBuilder.withArgName("args").withLongOpt("chunkNum").hasArg().create("c"); + opts.addOption(chunkNumOption); + + BasicParser parser = new BasicParser(); + CommandLine cl; + try { + cl = parser.parse(opts, args); + chunkNum = Integer.parseInt(cl.getOptionValue("c")); + } catch (Exception e) { + e.printStackTrace(); + } + + long totalStartTime = System.nanoTime(); + for (int fileIndex = 0; fileIndex < fileNum; fileIndex++) { + // file path + String path = + "/home/fit/szs/data/data/sequence/root.sg/1/" + + chunkNum + + "/test" + + fileIndex + + ".tsfile"; + + // aggregation query with chunkMetadata + try (TsFileSequenceReader reader = new TsFileSequenceReader(path)) { + Path seriesPath = new Path(DEVICE1, "sensor_1"); + long startTime = System.nanoTime(); + + List<ChunkMetadata> chunkMetadatas = reader.getChunkMetadataList(seriesPath, false); + Statistics statistics = Statistics.getStatsByType(chunkMetadatas.get(0).getDataType()); + for (ChunkMetadata chunkMetadata : chunkMetadatas) { + statistics.mergeStatistics(chunkMetadata.getStatistics()); + } + costTime += (System.nanoTime() - startTime); + } + } + System.out.println( + "Total aggregation cost time: " + (System.nanoTime() - totalStartTime) / 1000_000 + "ms"); + System.out.println("Index area cost time: " + costTime / 1000_000 + "ms"); + } +} diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1831/TsFileRawRead.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1831/TsFileRawRead.java new file mode 100644 index 0000000..945f535 --- /dev/null +++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1831/TsFileRawRead.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.test1831; + +import org.apache.iotdb.tsfile.read.ReadOnlyTsFile; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.expression.QueryExpression; +import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; + +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; + +import java.io.IOException; +import java.util.ArrayList; + +public class TsFileRawRead { + + private static final String DEVICE1 = "device_"; + public static int chunkNum; + public static int fileNum = 500; + + public static void main(String[] args) throws IOException { + long costTime = 0L; + long totalStartTime = System.nanoTime(); + Options opts = new Options(); + Option chunkNumOption = + OptionBuilder.withArgName("args").withLongOpt("chunkNum").hasArg().create("c"); + opts.addOption(chunkNumOption); + + BasicParser parser = new BasicParser(); + CommandLine cl; + try { + cl = parser.parse(opts, args); + chunkNum = Integer.parseInt(cl.getOptionValue("c")); + } catch (Exception e) { + e.printStackTrace(); + } + + for (int fileIndex = 0; fileIndex < fileNum; fileIndex++) { + // file path + String path = + "/home/fit/szs/data/data/sequence/root.sg/0/" + + chunkNum + + "/test" + + fileIndex + + ".tsfile"; + + // raw data query + try (TsFileSequenceReader reader = new TsFileSequenceReader(path); + ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader)) { + + ArrayList<Path> paths = new ArrayList<>(); + paths.add(new Path(DEVICE1, "sensor_1")); + + QueryExpression queryExpression = QueryExpression.create(paths, null); + + long startTime = System.nanoTime(); + QueryDataSet queryDataSet = readTsFile.query(queryExpression); + while (queryDataSet.hasNext()) { + queryDataSet.next(); + } + + costTime += (System.nanoTime() - startTime); + } + } + System.out.println( + "Total raw read cost time: " + (System.nanoTime() - totalStartTime) / 1000_000 + "ms"); + System.out.println("Index area cost time: " + costTime / 1000_000 + "ms"); + } +} diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1831/TsFileReadV2.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1831/TsFileReadV2.java new file mode 100644 index 0000000..7870153 --- /dev/null +++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1831/TsFileReadV2.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.test1831; + +import org.apache.iotdb.tsfile.read.ReadOnlyTsFile; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.expression.QueryExpression; +import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; + +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class TsFileReadV2 { + + private static final String DEVICE1 = "device_"; + public static int chunkNum; + public static int fileNum = 500; + + public static void main(String[] args) throws IOException { + long costTime = 0L; + Options opts = new Options(); + Option chunkNumOption = + OptionBuilder.withArgName("args").withLongOpt("chunkNum").hasArg().create("c"); + opts.addOption(chunkNumOption); + + BasicParser parser = new BasicParser(); + CommandLine cl; + try { + cl = parser.parse(opts, args); + chunkNum = Integer.parseInt(cl.getOptionValue("c")); + } catch (Exception e) { + e.printStackTrace(); + } + + long totalStartTime = System.nanoTime(); + for (int fileIndex = 0; fileIndex < fileNum; fileIndex++) { + // file path + String path = + "/home/fit/szs/data/data/sequence/root.sg/1/" + + chunkNum + + "/test" + + fileIndex + + ".tsfile"; + + // raw data query + try (TsFileSequenceReader reader = new TsFileSequenceReader(path); + ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader)) { + + List<Path> paths = new ArrayList<>(); + paths.add(new Path(DEVICE1, "sensor_1")); + + QueryExpression queryExpression = QueryExpression.create(paths, null); + long startTime = System.nanoTime(); + QueryDataSet queryDataSet = readTsFile.query(queryExpression); + while (queryDataSet.hasNext()) { + queryDataSet.next(); + } + costTime += (System.nanoTime() - startTime); + } + } + System.out.println( + "Total raw read cost time: " + (System.nanoTime() - totalStartTime) / 1000_000 + "ms"); + System.out.println("Index area cost time: " + costTime / 1000_000 + "ms"); + } +} diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1831/TsFileWrite.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1831/TsFileWrite.java new file mode 100644 index 0000000..4a5d0e6 --- /dev/null +++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1831/TsFileWrite.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tsfile.test1831; + +import org.apache.iotdb.tsfile.Constant; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.write.TsFileWriter; +import org.apache.iotdb.tsfile.write.record.TSRecord; +import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; +import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint; +import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema; + +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; + +import java.io.File; +import java.util.Random; + +/** + * An example of writing data with TSRecord to TsFile It uses the interface: public void + * addMeasurement(MeasurementSchema measurementSchema) throws WriteProcessException + */ +public class TsFileWrite { + + public static int chunkNum; + public static int fileNum = 500; + + public static void main(String[] args) { + Options opts = new Options(); + Option chunkNumOption = + OptionBuilder.withArgName("args").withLongOpt("chunkNum").hasArg().create("c"); + opts.addOption(chunkNumOption); + + BasicParser parser = new BasicParser(); + CommandLine cl; + try { + cl = parser.parse(opts, args); + chunkNum = Integer.parseInt(cl.getOptionValue("c")); + } catch (Exception e) { + e.printStackTrace(); + } + + for (int fileIndex = 0; fileIndex < fileNum; fileIndex++) { + try { + String path = + "/home/fit/szs/data/data/sequence/root.sg/0/" + + chunkNum + + "/test" + + fileIndex + + ".tsfile"; + File f = FSFactoryProducer.getFSFactory().getFile(path); + if (f.exists()) { + f.delete(); + } + + try (TsFileWriter tsFileWriter = new TsFileWriter(f)) { + // 1000 timeseries + for (int i = 1; i <= 1000; i++) { + tsFileWriter.registerTimeseries( + new Path(Constant.DEVICE_PREFIX, Constant.SENSOR_ + i), + new UnaryMeasurementSchema(Constant.SENSOR_ + i, TSDataType.INT64, TSEncoding.RLE)); + } + // construct TSRecord + for (int i = 1; i <= chunkNum * 100; i++) { + TSRecord tsRecord = new TSRecord(i, Constant.DEVICE_PREFIX); + for (int t = 1; t <= 1000; t++) { + DataPoint dPoint1 = new LongDataPoint(Constant.SENSOR_ + t, new Random().nextLong()); + tsRecord.addTuple(dPoint1); + } + // write TSRecord + tsFileWriter.write(tsRecord); + if (i % 100 == 0) { + tsFileWriter.flushAllChunkGroups(); + } + } + } + } catch (Throwable e) { + e.printStackTrace(); + System.out.println(e.getMessage()); + } + } + } +} diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1831/TsFileWriteV2.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1831/TsFileWriteV2.java new file mode 100644 index 0000000..89152ef --- /dev/null +++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1831/TsFileWriteV2.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tsfile.test1831; + +import org.apache.iotdb.tsfile.Constant; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.write.TsFileWriter; +import org.apache.iotdb.tsfile.write.record.TSRecord; +import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; +import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint; +import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema; + +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; + +import java.io.File; +import java.util.Random; + +/** + * An example of writing data with TSRecord to TsFile It uses the interface: public void + * addMeasurement(MeasurementSchema measurementSchema) throws WriteProcessException + */ +public class TsFileWriteV2 { + + public static int chunkNum; + public static int fileNum = 500; + + public static void main(String[] args) { + Options opts = new Options(); + Option chunkNumOption = + OptionBuilder.withArgName("args").withLongOpt("chunkNum").hasArg().create("c"); + opts.addOption(chunkNumOption); + + BasicParser parser = new BasicParser(); + CommandLine cl; + try { + cl = parser.parse(opts, args); + chunkNum = Integer.parseInt(cl.getOptionValue("c")); + } catch (Exception e) { + e.printStackTrace(); + } + + for (int fileIndex = 0; fileIndex < fileNum; fileIndex++) { + try { + String path = + "/home/fit/szs/data/data/sequence/root.sg/1/" + + chunkNum + + "/test" + + fileIndex + + ".tsfile"; + File f = FSFactoryProducer.getFSFactory().getFile(path); + if (f.exists()) { + f.delete(); + } + + try { + TsFileWriter tsFileWriter = new TsFileWriter(f); + // 1000 timeseries + for (int i = 1; i <= 1000; i++) { + tsFileWriter.registerTimeseries( + new Path(Constant.DEVICE_PREFIX, Constant.SENSOR_ + i), + new UnaryMeasurementSchema(Constant.SENSOR_ + i, TSDataType.INT64, TSEncoding.RLE)); + } + // construct TSRecord + for (int i = 1; i <= chunkNum * 100; i++) { + TSRecord tsRecord = new TSRecord(i, Constant.DEVICE_PREFIX); + for (int t = 1; t <= 1000; t++) { + DataPoint dPoint1 = new LongDataPoint(Constant.SENSOR_ + t, new Random().nextLong()); + tsRecord.addTuple(dPoint1); + } + // write TSRecord + tsFileWriter.write(tsRecord); + if (i % 100 == 0) { + tsFileWriter.flushAllChunkGroups(); + } + } + tsFileWriter.close(); + } catch (Throwable e) { + e.printStackTrace(); + System.out.println(e.getMessage()); + } + + } catch (Throwable e) { + e.printStackTrace(); + System.out.println(e.getMessage()); + } + } + } +} diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1832/TsFileSketchTool.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1832/TsFileSketchTool.java new file mode 100644 index 0000000..ead2726 --- /dev/null +++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1832/TsFileSketchTool.java @@ -0,0 +1,510 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tsfile.test1832; + +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.file.MetaMarker; +import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader; +import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry; +import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode; +import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata; +import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType; +import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.utils.BloomFilter; +import org.apache.iotdb.tsfile.utils.Pair; + +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +public class TsFileSketchTool { + + private String filename; + private PrintWriter pw; + private TsFileSketchToolReader reader; + private String splitStr; // for split different part of TsFile + + public static void main(String[] args) throws IOException { + Pair<String, String> fileNames = checkArgs(args); + String filename = + "/Users/samperson1997/git/iotdb/data/data/sequence/root.sg/0/1832/test5.tsfile"; + String outFile = "/Users/samperson1997/git/iotdb/data/data/sequence/root.sg/0/1832/1.txt"; + new TsFileSketchTool(filename, outFile).run(); + } + + /** + * construct TsFileSketchTool + * + * @param filename input file path + * @param outFile output file path + */ + public TsFileSketchTool(String filename, String outFile) { + try { + this.filename = filename; + pw = new PrintWriter(new FileWriter(outFile)); + reader = new TsFileSketchToolReader(filename); + StringBuilder str1 = new StringBuilder(); + for (int i = 0; i < 21; i++) { + str1.append("|"); + } + splitStr = str1.toString(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** entry of tool */ + public void run() throws IOException { + long length = FSFactoryProducer.getFSFactory().getFile(filename).length(); + printlnBoth( + pw, "-------------------------------- TsFile Sketch --------------------------------"); + printlnBoth(pw, "file path: " + filename); + printlnBoth(pw, "file length: " + length); + + // get metadata information + TsFileMetadata tsFileMetaData = reader.readFileMetadata(); + List<ChunkGroupMetadata> allChunkGroupMetadata = new ArrayList<>(); + reader.selfCheck(null, allChunkGroupMetadata, false); + + // print file information + printFileInfo(); + + // print chunk + printChunk(allChunkGroupMetadata); + + // metadata begins + if (tsFileMetaData.getMetadataIndex().getChildren().isEmpty()) { + printlnBoth(pw, String.format("%20s", reader.getFileMetadataPos() - 1) + "|\t[marker] 2"); + } else { + printlnBoth( + pw, String.format("%20s", reader.readFileMetadata().getMetaOffset()) + "|\t[marker] 2"); + } + // get all timeseries index + Map<Long, Pair<Path, TimeseriesMetadata>> timeseriesMetadataMap = + reader.getAllTimeseriesMetadataWithOffset(); + + // print timeseries index + printTimeseriesIndex(timeseriesMetadataMap); + + MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex(); + TreeMap<Long, MetadataIndexNode> metadataIndexNodeMap = new TreeMap<>(); + List<String> treeOutputStringBuffer = new ArrayList<>(); + loadIndexTree(metadataIndexNode, metadataIndexNodeMap, treeOutputStringBuffer, 0); + + // print IndexOfTimerseriesIndex + printIndexOfTimerseriesIndex(metadataIndexNodeMap); + + // print TsFile Metadata + printTsFileMetadata(tsFileMetaData); + + printlnBoth(pw, String.format("%20s", length) + "|\tEND of TsFile"); + printlnBoth( + pw, + "---------------------------- IndexOfTimerseriesIndex Tree -----------------------------"); + // print index tree + for (String str : treeOutputStringBuffer) { + printlnBoth(pw, str); + } + printlnBoth( + pw, + "---------------------------------- TsFile Sketch End ----------------------------------"); + pw.close(); + } + + private void printTsFileMetadata(TsFileMetadata tsFileMetaData) { + try { + printlnBoth(pw, String.format("%20s", reader.getFileMetadataPos()) + "|\t[TsFileMetadata]"); + printlnBoth( + pw, String.format("%20s", "") + "|\t\t[meta offset] " + tsFileMetaData.getMetaOffset()); + printlnBoth( + pw, + String.format("%20s", "") + + "|\t\t[num of devices] " + + tsFileMetaData.getMetadataIndex().getChildren().size()); + printlnBoth( + pw, + String.format("%20s", "") + + "|\t\t" + + tsFileMetaData.getMetadataIndex().getChildren().size() + + " key&TsMetadataIndex"); + // bloom filter + BloomFilter bloomFilter = tsFileMetaData.getBloomFilter(); + printlnBoth( + pw, + String.format("%20s", "") + + "|\t\t[bloom filter bit vector byte array length] " + + bloomFilter.serialize().length); + printlnBoth(pw, String.format("%20s", "") + "|\t\t[bloom filter bit vector byte array] "); + printlnBoth( + pw, + String.format("%20s", "") + + "|\t\t[bloom filter number of bits] " + + bloomFilter.getSize()); + printlnBoth( + pw, + String.format("%20s", "") + + "|\t\t[bloom filter number of hash functions] " + + bloomFilter.getHashFunctionSize()); + + printlnBoth( + pw, + String.format("%20s", (reader.getFileMetadataPos() + reader.getFileMetadataSize())) + + "|\t[TsFileMetadataSize] " + + reader.getFileMetadataSize()); + + printlnBoth( + pw, + String.format("%20s", reader.getFileMetadataPos() + reader.getFileMetadataSize() + 4) + + "|\t[magic tail] " + + reader.readTailMagic()); + } catch (IOException e) { + e.printStackTrace(); + } + } + + private void printIndexOfTimerseriesIndex(TreeMap<Long, MetadataIndexNode> metadataIndexNodeMap) { + for (Map.Entry<Long, MetadataIndexNode> entry : metadataIndexNodeMap.entrySet()) { + printlnBoth( + pw, + String.format("%20s", entry.getKey()) + + "|\t[IndexOfTimerseriesIndex Node] type=" + + entry.getValue().getNodeType()); + for (MetadataIndexEntry metadataIndexEntry : entry.getValue().getChildren()) { + printlnBoth( + pw, + String.format("%20s", "") + + "|\t\t<" + + metadataIndexEntry.getName() + + ", " + + metadataIndexEntry.getOffset() + + ">"); + } + printlnBoth( + pw, + String.format("%20s", "") + "|\t\t<endOffset, " + entry.getValue().getEndOffset() + ">"); + } + } + + private void printFileInfo() { + try { + printlnBoth(pw, ""); + printlnBoth(pw, String.format("%20s", "POSITION") + "|\tCONTENT"); + printlnBoth(pw, String.format("%20s", "--------") + " \t-------"); + printlnBoth(pw, String.format("%20d", 0) + "|\t[magic head] " + reader.readHeadMagic()); + printlnBoth( + pw, + String.format("%20d", TSFileConfig.MAGIC_STRING.getBytes().length) + + "|\t[version number] " + + reader.readVersionNumber()); + } catch (IOException e) { + e.printStackTrace(); + } + } + + private void printChunk(List<ChunkGroupMetadata> allChunkGroupMetadata) { + try { + long nextChunkGroupHeaderPos = + (long) TSFileConfig.MAGIC_STRING.getBytes().length + Byte.BYTES; + // ChunkGroup begins + for (ChunkGroupMetadata chunkGroupMetadata : allChunkGroupMetadata) { + printlnBoth( + pw, + splitStr + + "\t[Chunk Group] of " + + chunkGroupMetadata.getDevice() + + ", num of Chunks:" + + chunkGroupMetadata.getChunkMetadataList().size()); + // chunkGroupHeader begins + printlnBoth(pw, String.format("%20s", nextChunkGroupHeaderPos) + "|\t[Chunk Group Header]"); + ChunkGroupHeader chunkGroupHeader = + reader.readChunkGroupHeader(nextChunkGroupHeaderPos, false); + printlnBoth(pw, String.format("%20s", "") + "|\t\t[marker] 0"); + printlnBoth( + pw, String.format("%20s", "") + "|\t\t[deviceID] " + chunkGroupHeader.getDeviceID()); + // chunk begins + for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) { + Chunk chunk = reader.readMemChunk(chunkMetadata); + printlnBoth( + pw, + String.format("%20d", chunkMetadata.getOffsetOfChunkHeader()) + + "|\t[Chunk] of " + + chunkMetadata.getMeasurementUid() + + ", numOfPoints:" + + chunkMetadata.getNumOfPoints() + + ", time range:[" + + chunkMetadata.getStartTime() + + "," + + chunkMetadata.getEndTime() + + "], tsDataType:" + + chunkMetadata.getDataType() + + ", \n" + + String.format("%20s", "") + + " \t" + + chunkMetadata.getStatistics()); + printlnBoth( + pw, + String.format("%20s", "") + + "|\t\t[chunk header] " + + "marker=" + + chunk.getHeader().getChunkType() + + ", measurementId=" + + chunk.getHeader().getMeasurementID() + + ", dataSize=" + + chunk.getHeader().getDataSize() + + ", serializedSize=" + + chunk.getHeader().getSerializedSize()); + + printlnBoth(pw, String.format("%20s", "") + "|\t\t[chunk] " + chunk.getData()); + PageHeader pageHeader; + if (((byte) (chunk.getHeader().getChunkType() & 0x3F)) + == MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) { + pageHeader = PageHeader.deserializeFrom(chunk.getData(), chunkMetadata.getStatistics()); + } else { + pageHeader = + PageHeader.deserializeFrom(chunk.getData(), chunk.getHeader().getDataType()); + } + printlnBoth( + pw, + String.format("%20s", "") + + "|\t\t[page] " + + " CompressedSize:" + + pageHeader.getCompressedSize() + + ", UncompressedSize:" + + pageHeader.getUncompressedSize()); + nextChunkGroupHeaderPos = + chunkMetadata.getOffsetOfChunkHeader() + + chunk.getHeader().getSerializedSize() + + chunk.getHeader().getDataSize(); + } + reader.position(nextChunkGroupHeaderPos); + byte marker = reader.readMarker(); + switch (marker) { + case MetaMarker.CHUNK_GROUP_HEADER: + // do nothing + break; + case MetaMarker.OPERATION_INDEX_RANGE: + // skip the PlanIndex + nextChunkGroupHeaderPos += 16; + break; + } + + printlnBoth( + pw, splitStr + "\t[Chunk Group] of " + chunkGroupMetadata.getDevice() + " ends"); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + private void printTimeseriesIndex( + Map<Long, Pair<Path, TimeseriesMetadata>> timeseriesMetadataMap) { + try { + for (Map.Entry<Long, Pair<Path, TimeseriesMetadata>> entry : + timeseriesMetadataMap.entrySet()) { + printlnBoth( + pw, + String.format("%20s", entry.getKey()) + + "|\t[TimeseriesIndex] of " + + entry.getValue().left + + ", tsDataType:" + + entry.getValue().right.getTSDataType()); + for (IChunkMetadata chunkMetadata : reader.getChunkMetadataList(entry.getValue().left)) { + printlnBoth( + pw, + String.format("%20s", "") + + "|\t\t[ChunkIndex] " + + chunkMetadata.getMeasurementUid() + + ", offset=" + + chunkMetadata.getOffsetOfChunkHeader()); + } + printlnBoth( + pw, + String.format("%20s", "") + "|\t\t[" + entry.getValue().right.getStatistics() + "] "); + } + printlnBoth(pw, splitStr); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * load by dfs, and sort by TreeMap + * + * @param metadataIndexNode current node + * @param metadataIndexNodeMap result map, key is offset + * @param treeOutputStringBuffer result list, string is index tree + * @param deep current deep + */ + private void loadIndexTree( + MetadataIndexNode metadataIndexNode, + TreeMap<Long, MetadataIndexNode> metadataIndexNodeMap, + List<String> treeOutputStringBuffer, + int deep) + throws IOException { + StringBuilder tableWriter = new StringBuilder("\t"); + for (int i = 0; i < deep; i++) { + tableWriter.append("\t\t"); + } + treeOutputStringBuffer.add( + tableWriter.toString() + "[MetadataIndex:" + metadataIndexNode.getNodeType() + "]"); + for (int i = 0; i < metadataIndexNode.getChildren().size(); i++) { + MetadataIndexEntry metadataIndexEntry = metadataIndexNode.getChildren().get(i); + + treeOutputStringBuffer.add( + tableWriter.toString() + + "└──────[" + + metadataIndexEntry.getName() + + "," + + metadataIndexEntry.getOffset() + + "]"); + if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { + long endOffset = metadataIndexNode.getEndOffset(); + if (i != metadataIndexNode.getChildren().size() - 1) { + endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset(); + } + MetadataIndexNode subNode = + reader.getMetadataIndexNode(metadataIndexEntry.getOffset(), endOffset); + metadataIndexNodeMap.put(metadataIndexEntry.getOffset(), subNode); + loadIndexTree(subNode, metadataIndexNodeMap, treeOutputStringBuffer, deep + 1); + } + } + } + + private void printlnBoth(PrintWriter pw, String str) { + System.out.println(str); + pw.println(str); + } + + private static Pair<String, String> checkArgs(String[] args) { + String filename = "test.tsfile"; + String outFile = "TsFile_sketch_view.txt"; + if (args.length == 1) { + filename = args[0]; + } else if (args.length == 2) { + filename = args[0]; + outFile = args[1]; + } + return new Pair<>(filename, outFile); + } + + private class TsFileSketchToolReader extends TsFileSequenceReader { + public TsFileSketchToolReader(String file) throws IOException { + super(file); + } + /** + * Traverse the metadata index from MetadataIndexEntry to get TimeseriesMetadatas + * + * @param metadataIndex MetadataIndexEntry + * @param buffer byte buffer + * @param deviceId String + * @param timeseriesMetadataMap map: deviceId -> timeseriesMetadata list + * @param needChunkMetadata deserialize chunk metadata list or not + */ + private void generateMetadataIndexWithOffset( + long startOffset, + MetadataIndexEntry metadataIndex, + ByteBuffer buffer, + String deviceId, + MetadataIndexNodeType type, + Map<Long, Pair<Path, TimeseriesMetadata>> timeseriesMetadataMap, + boolean needChunkMetadata) + throws IOException { + try { + if (type.equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { + while (buffer.hasRemaining()) { + long pos = startOffset + buffer.position(); + TimeseriesMetadata timeseriesMetadata = + TimeseriesMetadata.deserializeFrom(buffer, needChunkMetadata); + timeseriesMetadataMap.put( + pos, + new Pair<>( + new Path(deviceId, timeseriesMetadata.getMeasurementId()), timeseriesMetadata)); + } + } else { + // deviceId should be determined by LEAF_DEVICE node + if (type.equals(MetadataIndexNodeType.LEAF_DEVICE)) { + deviceId = metadataIndex.getName(); + } + MetadataIndexNode metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer); + int metadataIndexListSize = metadataIndexNode.getChildren().size(); + for (int i = 0; i < metadataIndexListSize; i++) { + long endOffset = metadataIndexNode.getEndOffset(); + if (i != metadataIndexListSize - 1) { + endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset(); + } + ByteBuffer nextBuffer = + readData(metadataIndexNode.getChildren().get(i).getOffset(), endOffset); + generateMetadataIndexWithOffset( + metadataIndexNode.getChildren().get(i).getOffset(), + metadataIndexNode.getChildren().get(i), + nextBuffer, + deviceId, + metadataIndexNode.getNodeType(), + timeseriesMetadataMap, + needChunkMetadata); + } + } + } catch (BufferOverflowException e) { + throw e; + } + } + + public Map<Long, Pair<Path, TimeseriesMetadata>> getAllTimeseriesMetadataWithOffset() + throws IOException { + if (tsFileMetaData == null) { + readFileMetadata(); + } + MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex(); + Map<Long, Pair<Path, TimeseriesMetadata>> timeseriesMetadataMap = new TreeMap<>(); + List<MetadataIndexEntry> metadataIndexEntryList = metadataIndexNode.getChildren(); + for (int i = 0; i < metadataIndexEntryList.size(); i++) { + MetadataIndexEntry metadataIndexEntry = metadataIndexEntryList.get(i); + long endOffset = tsFileMetaData.getMetadataIndex().getEndOffset(); + if (i != metadataIndexEntryList.size() - 1) { + endOffset = metadataIndexEntryList.get(i + 1).getOffset(); + } + ByteBuffer buffer = readData(metadataIndexEntry.getOffset(), endOffset); + generateMetadataIndexWithOffset( + metadataIndexEntry.getOffset(), + metadataIndexEntry, + buffer, + null, + metadataIndexNode.getNodeType(), + timeseriesMetadataMap, + false); + } + return timeseriesMetadataMap; + } + } +} diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1832/TsFileWrite.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1832/TsFileWrite.java new file mode 100644 index 0000000..45f85a0 --- /dev/null +++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1832/TsFileWrite.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tsfile.test1832; + +import org.apache.iotdb.tsfile.Constant; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.write.TsFileWriter; +import org.apache.iotdb.tsfile.write.record.TSRecord; +import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; +import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint; +import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema; + +import java.io.File; +import java.util.Random; + +/** + * An example of writing data with TSRecord to TsFile It uses the interface: public void + * addMeasurement(MeasurementSchema measurementSchema) throws WriteProcessException + */ +public class TsFileWrite { + + public static void main(String[] args) { + + try { + Random random = new Random(); + String path = "/Users/samperson1997/git/iotdb/data/data/sequence/root.sg/0/1832/test5.tsfile"; + File f = FSFactoryProducer.getFSFactory().getFile(path); + if (f.exists()) { + f.delete(); + } + + try (TsFileWriter tsFileWriter = new TsFileWriter(f)) { + // only one timeseries + tsFileWriter.registerTimeseries( + new Path(Constant.DEVICE_PREFIX, Constant.SENSOR_1), + new UnaryMeasurementSchema(Constant.SENSOR_1, TSDataType.INT64, TSEncoding.RLE)); + + // construct TSRecord + for (int i = 1; i <= 7977; i++) { + TSRecord tsRecord = new TSRecord(i, Constant.DEVICE_PREFIX); + DataPoint dPoint1 = new LongDataPoint(Constant.SENSOR_1, random.nextLong()); + tsRecord.addTuple(dPoint1); + // write TSRecord + tsFileWriter.write(tsRecord); + } + } + } catch (Throwable e) { + e.printStackTrace(); + System.out.println(e.getMessage()); + } + } +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java index 33908b6..3d38f47 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java @@ -183,6 +183,25 @@ public class ChunkMetadata implements Accountable, IChunkMetadata { return chunkMetaData; } + public static ChunkMetadata deserializeFrom( + ByteBuffer buffer, TimeseriesMetadataV2 timeseriesMetadata) { + ChunkMetadata chunkMetaData = new ChunkMetadata(); + + chunkMetaData.measurementUid = timeseriesMetadata.getMeasurementId(); + chunkMetaData.tsDataType = timeseriesMetadata.getTSDataType(); + chunkMetaData.offsetOfChunkHeader = ReadWriteIOUtils.readLong(buffer); + // if the TimeSeriesMetadataType is not 0, it means it has more than one chunk + // and each chunk's metadata has its own statistics + if ((timeseriesMetadata.getTimeSeriesMetadataType() & 0x3F) != 0) { + chunkMetaData.statistics = Statistics.deserialize(buffer, chunkMetaData.tsDataType); + } else { + // if the TimeSeriesMetadataType is 0, it means it has only one chunk + // and that chunk's metadata has no statistic + chunkMetaData.statistics = timeseriesMetadata.getStatistics(); + } + return chunkMetaData; + } + @Override public long getVersion() { return version; diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructorV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructorV2.java new file mode 100644 index 0000000..8b9ea6a --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructorV2.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tsfile.file.metadata; + +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; +import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType; +import org.apache.iotdb.tsfile.write.writer.TsFileOutput; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Queue; +import java.util.TreeMap; + +public class MetadataIndexConstructorV2 { + + private static final TSFileConfig config = TSFileDescriptor.getInstance().getConfig(); + + private MetadataIndexConstructorV2() { + throw new IllegalStateException("Utility class"); + } + + /** + * Construct metadata index tree + * + * @param deviceTimeseriesMetadataMap device => TimeseriesMetadata list + * @param out tsfile output + */ + @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning + public static MetadataIndexNode constructMetadataIndex( + Map<String, List<TimeseriesMetadataV2>> deviceTimeseriesMetadataMap, TsFileOutput out) + throws IOException { + + Map<String, MetadataIndexNode> deviceMetadataIndexMap = new TreeMap<>(); + + // for timeseriesMetadata of each device + for (Entry<String, List<TimeseriesMetadataV2>> entry : deviceTimeseriesMetadataMap.entrySet()) { + if (entry.getValue().isEmpty()) { + continue; + } + Queue<MetadataIndexNode> measurementMetadataIndexQueue = new ArrayDeque<>(); + TimeseriesMetadataV2 timeseriesMetadata; + MetadataIndexNode currentIndexNode = + new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT); + int serializedTimeseriesMetadataNum = 0; + for (int i = 0; i < entry.getValue().size(); i++) { + timeseriesMetadata = entry.getValue().get(i); + if (serializedTimeseriesMetadataNum == 0 + || serializedTimeseriesMetadataNum >= config.getMaxDegreeOfIndexNode()) { + if (currentIndexNode.isFull()) { + addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out); + currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT); + } + currentIndexNode.addEntry( + new MetadataIndexEntry(timeseriesMetadata.getMeasurementId(), out.getPosition())); + serializedTimeseriesMetadataNum = 0; + } + timeseriesMetadata.serializeTo(out.wrapAsStream()); + serializedTimeseriesMetadataNum++; + } + addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out); + deviceMetadataIndexMap.put( + entry.getKey(), + generateRootNode( + measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT)); + } + + // if not exceed the max child nodes num, ignore the device index and directly point to the + // measurement + if (deviceMetadataIndexMap.size() <= config.getMaxDegreeOfIndexNode()) { + MetadataIndexNode metadataIndexNode = + new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE); + for (Entry<String, MetadataIndexNode> entry : deviceMetadataIndexMap.entrySet()) { + metadataIndexNode.addEntry(new MetadataIndexEntry(entry.getKey(), out.getPosition())); + entry.getValue().serializeTo(out.wrapAsStream()); + } + metadataIndexNode.setEndOffset(out.getPosition()); + return metadataIndexNode; + } + + // else, build level index for devices + Queue<MetadataIndexNode> deviceMetadataIndexQueue = new ArrayDeque<>(); + MetadataIndexNode currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE); + + for (Entry<String, MetadataIndexNode> entry : deviceMetadataIndexMap.entrySet()) { + // when constructing from internal node, each node is related to an entry + if (currentIndexNode.isFull()) { + addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadataIndexQueue, out); + currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE); + } + currentIndexNode.addEntry(new MetadataIndexEntry(entry.getKey(), out.getPosition())); + entry.getValue().serializeTo(out.wrapAsStream()); + } + addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadataIndexQueue, out); + MetadataIndexNode deviceMetadataIndexNode = + generateRootNode(deviceMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_DEVICE); + deviceMetadataIndexNode.setEndOffset(out.getPosition()); + return deviceMetadataIndexNode; + } + + /** + * Generate root node, using the nodes in the queue as leaf nodes. The final metadata tree has two + * levels: measurement leaf nodes will generate to measurement root node; device leaf nodes will + * generate to device root node + * + * @param metadataIndexNodeQueue queue of metadataIndexNode + * @param out tsfile output + * @param type MetadataIndexNode type + */ + private static MetadataIndexNode generateRootNode( + Queue<MetadataIndexNode> metadataIndexNodeQueue, TsFileOutput out, MetadataIndexNodeType type) + throws IOException { + int queueSize = metadataIndexNodeQueue.size(); + MetadataIndexNode metadataIndexNode; + MetadataIndexNode currentIndexNode = new MetadataIndexNode(type); + while (queueSize != 1) { + for (int i = 0; i < queueSize; i++) { + metadataIndexNode = metadataIndexNodeQueue.poll(); + // when constructing from internal node, each node is related to an entry + if (currentIndexNode.isFull()) { + addCurrentIndexNodeToQueue(currentIndexNode, metadataIndexNodeQueue, out); + currentIndexNode = new MetadataIndexNode(type); + } + currentIndexNode.addEntry( + new MetadataIndexEntry(metadataIndexNode.peek().getName(), out.getPosition())); + metadataIndexNode.serializeTo(out.wrapAsStream()); + } + addCurrentIndexNodeToQueue(currentIndexNode, metadataIndexNodeQueue, out); + currentIndexNode = new MetadataIndexNode(type); + queueSize = metadataIndexNodeQueue.size(); + } + return metadataIndexNodeQueue.poll(); + } + + private static void addCurrentIndexNodeToQueue( + MetadataIndexNode currentIndexNode, + Queue<MetadataIndexNode> metadataIndexNodeQueue, + TsFileOutput out) + throws IOException { + currentIndexNode.setEndOffset(out.getPosition()); + metadataIndexNodeQueue.add(currentIndexNode); + } +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadataV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadataV2.java new file mode 100644 index 0000000..c11fe40 --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadataV2.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tsfile.file.metadata; + +import org.apache.iotdb.tsfile.common.cache.Accountable; +import org.apache.iotdb.tsfile.common.constant.TsFileConstant; +import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.read.controller.IChunkMetadataLoader; +import org.apache.iotdb.tsfile.utils.PublicBAOS; +import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class TimeseriesMetadataV2 implements Accountable, ITimeSeriesMetadata { + + /** used for old version tsfile */ + private long startOffsetOfChunkMetaDataList; + /** + * 0 means this time series has only one chunk, no need to save the statistic again in chunk + * metadata; + * + * <p>1 means this time series has more than one chunk, should save the statistic again in chunk + * metadata; + * + * <p>if the 8th bit is 1, it means it is the time column of a vector series; + * + * <p>if the 7th bit is 1, it means it is the value column of a vector series + */ + private byte timeSeriesMetadataType; + + private int chunkMetaDataListDataSize; + + private String measurementId; + private TSDataType dataType; + + // modified is true when there are modifications of the series, or from unseq file + private boolean modified; + + protected IChunkMetadataLoader chunkMetadataLoader; + + private long ramSize; + + // used for SeriesReader to indicate whether it is a seq/unseq timeseries metadata + private boolean isSeq = true; + + // used to save chunk metadata list while serializing + private PublicBAOS chunkMetadataListBuffer; + + private ArrayList<IChunkMetadata> chunkMetadataList; + + public TimeseriesMetadataV2() {} + + public TimeseriesMetadataV2( + byte timeSeriesMetadataType, + int chunkMetaDataListDataSize, + String measurementId, + TSDataType dataType, + PublicBAOS chunkMetadataListBuffer) { + this.timeSeriesMetadataType = timeSeriesMetadataType; + this.chunkMetaDataListDataSize = chunkMetaDataListDataSize; + this.measurementId = measurementId; + this.dataType = dataType; + this.chunkMetadataListBuffer = chunkMetadataListBuffer; + } + + public TimeseriesMetadataV2(TimeseriesMetadataV2 timeseriesMetadata) { + this.timeSeriesMetadataType = timeseriesMetadata.timeSeriesMetadataType; + this.chunkMetaDataListDataSize = timeseriesMetadata.chunkMetaDataListDataSize; + this.measurementId = timeseriesMetadata.measurementId; + this.dataType = timeseriesMetadata.dataType; + this.modified = timeseriesMetadata.modified; + this.chunkMetadataList = new ArrayList<>(timeseriesMetadata.chunkMetadataList); + } + + public static TimeseriesMetadataV2 deserializeFrom(ByteBuffer buffer, boolean needChunkMetadata) { + TimeseriesMetadataV2 timeseriesMetaData = new TimeseriesMetadataV2(); + timeseriesMetaData.setTimeSeriesMetadataType(ReadWriteIOUtils.readByte(buffer)); + timeseriesMetaData.setMeasurementId(ReadWriteIOUtils.readVarIntString(buffer)); + timeseriesMetaData.setTSDataType(ReadWriteIOUtils.readDataType(buffer)); + int chunkMetaDataListDataSize = ReadWriteForEncodingUtils.readUnsignedVarInt(buffer); + timeseriesMetaData.setDataSizeOfChunkMetaDataList(chunkMetaDataListDataSize); + if (needChunkMetadata) { + ByteBuffer byteBuffer = buffer.slice(); + byteBuffer.limit(chunkMetaDataListDataSize); + timeseriesMetaData.chunkMetadataList = new ArrayList<>(); + while (byteBuffer.hasRemaining()) { + timeseriesMetaData.chunkMetadataList.add( + ChunkMetadata.deserializeFrom(byteBuffer, timeseriesMetaData)); + } + // minimize the storage of an ArrayList instance. + timeseriesMetaData.chunkMetadataList.trimToSize(); + } + buffer.position(buffer.position() + chunkMetaDataListDataSize); + return timeseriesMetaData; + } + + /** + * serialize to outputStream. + * + * @param outputStream outputStream + * @return byte length + * @throws IOException IOException + */ + public int serializeTo(OutputStream outputStream) throws IOException { + int byteLen = 0; + byteLen += ReadWriteIOUtils.write(timeSeriesMetadataType, outputStream); + byteLen += ReadWriteIOUtils.writeVar(measurementId, outputStream); + byteLen += ReadWriteIOUtils.write(dataType, outputStream); + byteLen += + ReadWriteForEncodingUtils.writeUnsignedVarInt(chunkMetaDataListDataSize, outputStream); + chunkMetadataListBuffer.writeTo(outputStream); + byteLen += chunkMetadataListBuffer.size(); + return byteLen; + } + + public byte getTimeSeriesMetadataType() { + return timeSeriesMetadataType; + } + + public boolean isTimeColumn() { + return timeSeriesMetadataType == TsFileConstant.TIME_COLUMN_MASK; + } + + public boolean isValueColumn() { + return timeSeriesMetadataType == TsFileConstant.VALUE_COLUMN_MASK; + } + + public void setTimeSeriesMetadataType(byte timeSeriesMetadataType) { + this.timeSeriesMetadataType = timeSeriesMetadataType; + } + + public long getOffsetOfChunkMetaDataList() { + return startOffsetOfChunkMetaDataList; + } + + public void setOffsetOfChunkMetaDataList(long position) { + this.startOffsetOfChunkMetaDataList = position; + } + + public String getMeasurementId() { + return measurementId; + } + + public void setMeasurementId(String measurementId) { + this.measurementId = measurementId; + } + + public int getDataSizeOfChunkMetaDataList() { + return chunkMetaDataListDataSize; + } + + public void setDataSizeOfChunkMetaDataList(int size) { + this.chunkMetaDataListDataSize = size; + } + + public TSDataType getTSDataType() { + return dataType; + } + + public void setTSDataType(TSDataType tsDataType) { + this.dataType = tsDataType; + } + + @Override + public void setChunkMetadataLoader(IChunkMetadataLoader chunkMetadataLoader) { + this.chunkMetadataLoader = chunkMetadataLoader; + } + + public IChunkMetadataLoader getChunkMetadataLoader() { + return chunkMetadataLoader; + } + + @Override + public List<IChunkMetadata> loadChunkMetadataList() throws IOException { + return chunkMetadataLoader.loadChunkMetadataList(this); + } + + @Override + public List<IChunkMetadata> getChunkMetadataList() { + return chunkMetadataList; + } + + @Override + public Statistics getStatistics() { + throw new UnSupportedDataTypeException("No statistics in TimeseriesMetadataV2"); + } + + public boolean isModified() { + return modified; + } + + public void setModified(boolean modified) { + this.modified = modified; + } + + @Override + public void setRamSize(long size) { + this.ramSize = size; + } + + @Override + public long getRamSize() { + return ramSize; + } + + @Override + public void setSeq(boolean seq) { + isSeq = seq; + } + + @Override + public boolean isSeq() { + return isSeq; + } + + // For Test Only + public void setChunkMetadataListBuffer(PublicBAOS chunkMetadataListBuffer) { + this.chunkMetadataListBuffer = chunkMetadataListBuffer; + } + + // For reading version-2 only + public void setChunkMetadataList(ArrayList<ChunkMetadata> chunkMetadataList) { + this.chunkMetadataList = new ArrayList<>(chunkMetadataList); + } + + @Override + public String toString() { + return "TimeseriesMetadata{" + + "startOffsetOfChunkMetaDataList=" + + startOffsetOfChunkMetaDataList + + ", timeSeriesMetadataType=" + + timeSeriesMetadataType + + ", chunkMetaDataListDataSize=" + + chunkMetaDataListDataSize + + ", measurementId='" + + measurementId + + '\'' + + ", dataType=" + + dataType + + ", modified=" + + modified + + ", isSeq=" + + isSeq + + ", chunkMetadataList=" + + chunkMetadataList + + '}'; + } +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java index 7588b84..9a24644 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java @@ -33,6 +33,7 @@ import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry; import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode; import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadataV2; import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType; @@ -370,6 +371,50 @@ public class TsFileSequenceReader implements AutoCloseable { return searchResult >= 0 ? timeseriesMetadataList.get(searchResult) : null; } + public TimeseriesMetadataV2 readTimeseriesMetadataV2(Path path, boolean ignoreNotExists) + throws IOException { + readFileMetadata(); + MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex(); + Pair<MetadataIndexEntry, Long> metadataIndexPair = + getMetadataAndEndOffset(deviceMetadataIndexNode, path.getDevice(), true, true); + if (metadataIndexPair == null) { + if (ignoreNotExists) { + return null; + } + throw new IOException("Device {" + path.getDevice() + "} is not in tsFileMetaData"); + } + ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); + MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode; + if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { + try { + metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer); + } catch (BufferOverflowException e) { + logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file); + throw e; + } + metadataIndexPair = + getMetadataAndEndOffset(metadataIndexNode, path.getMeasurement(), false, false); + } + if (metadataIndexPair == null) { + return null; + } + List<TimeseriesMetadataV2> timeseriesMetadataList = new ArrayList<>(); + buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); + while (buffer.hasRemaining()) { + try { + timeseriesMetadataList.add(TimeseriesMetadataV2.deserializeFrom(buffer, true)); + } catch (BufferOverflowException e) { + logger.error( + "Something error happened while deserializing TimeseriesMetadata of file {}", file); + throw e; + } + } + // return null if path does not exist in the TsFile + int searchResult = + binarySearchInTimeseriesMetadataV2List(timeseriesMetadataList, path.getMeasurement()); + return searchResult >= 0 ? timeseriesMetadataList.get(searchResult) : null; + } + /** * Find the leaf node that contains this vector, return all the needed subSensor and time column * @@ -517,6 +562,69 @@ public class TsFileSequenceReader implements AutoCloseable { return resultTimeseriesMetadataList; } + public List<TimeseriesMetadataV2> readTimeseriesMetadataV2( + String device, Set<String> measurements) throws IOException { + readFileMetadata(); + MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex(); + Pair<MetadataIndexEntry, Long> metadataIndexPair = + getMetadataAndEndOffset(deviceMetadataIndexNode, device, true, false); + if (metadataIndexPair == null) { + return Collections.emptyList(); + } + List<TimeseriesMetadataV2> resultTimeseriesMetadataList = new ArrayList<>(); + List<String> measurementList = new ArrayList<>(measurements); + Set<String> measurementsHadFound = new HashSet<>(); + for (int i = 0; i < measurementList.size(); i++) { + if (measurementsHadFound.contains(measurementList.get(i))) { + continue; + } + ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); + Pair<MetadataIndexEntry, Long> measurementMetadataIndexPair = metadataIndexPair; + List<TimeseriesMetadataV2> timeseriesMetadataList = new ArrayList<>(); + MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode; + if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { + try { + metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer); + } catch (BufferOverflowException e) { + logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file); + throw e; + } + measurementMetadataIndexPair = + getMetadataAndEndOffset(metadataIndexNode, measurementList.get(i), false, false); + } + if (measurementMetadataIndexPair == null) { + return Collections.emptyList(); + } + buffer = + readData( + measurementMetadataIndexPair.left.getOffset(), measurementMetadataIndexPair.right); + while (buffer.hasRemaining()) { + try { + timeseriesMetadataList.add(TimeseriesMetadataV2.deserializeFrom(buffer, true)); + } catch (BufferOverflowException e) { + logger.error( + "Something error happened while deserializing TimeseriesMetadata of file {}", file); + throw e; + } + } + for (int j = i; j < measurementList.size(); j++) { + String current = measurementList.get(j); + if (!measurementsHadFound.contains(current)) { + int searchResult = + binarySearchInTimeseriesMetadataV2List(timeseriesMetadataList, current); + if (searchResult >= 0) { + resultTimeseriesMetadataList.add(timeseriesMetadataList.get(searchResult)); + measurementsHadFound.add(current); + } + } + if (measurementsHadFound.size() == measurements.size()) { + return resultTimeseriesMetadataList; + } + } + } + return resultTimeseriesMetadataList; + } + protected int binarySearchInTimeseriesMetadataList( List<TimeseriesMetadata> timeseriesMetadataList, String key) { int low = 0; @@ -538,6 +646,27 @@ public class TsFileSequenceReader implements AutoCloseable { return -1; // key not found } + protected int binarySearchInTimeseriesMetadataV2List( + List<TimeseriesMetadataV2> timeseriesMetadataList, String key) { + int low = 0; + int high = timeseriesMetadataList.size() - 1; + + while (low <= high) { + int mid = (low + high) >>> 1; + TimeseriesMetadataV2 midVal = timeseriesMetadataList.get(mid); + int cmp = midVal.getMeasurementId().compareTo(key); + + if (cmp < 0) { + low = mid + 1; + } else if (cmp > 0) { + high = mid - 1; + } else { + return mid; // key found + } + } + return -1; // key not found + } + public List<String> getAllDevices() throws IOException { if (tsFileMetaData == null) { readFileMetadata(); @@ -1212,6 +1341,17 @@ public class TsFileSequenceReader implements AutoCloseable { return chunkMetadataList; } + public List<ChunkMetadata> getChunkMetadataListV2(Path path, boolean ignoreNotExists) + throws IOException { + TimeseriesMetadataV2 timeseriesMetaData = readTimeseriesMetadataV2(path, ignoreNotExists); + if (timeseriesMetaData == null) { + return Collections.emptyList(); + } + List<ChunkMetadata> chunkMetadataList = readChunkMetaDataListV2(timeseriesMetaData); + chunkMetadataList.sort(Comparator.comparingLong(IChunkMetadata::getStartTime)); + return chunkMetadataList; + } + public List<ChunkMetadata> getChunkMetadataList(Path path) throws IOException { return getChunkMetadataList(path, false); } @@ -1228,6 +1368,13 @@ public class TsFileSequenceReader implements AutoCloseable { .collect(Collectors.toList()); } + public List<ChunkMetadata> readChunkMetaDataListV2(TimeseriesMetadataV2 timeseriesMetaData) + throws IOException { + return timeseriesMetaData.getChunkMetadataList().stream() + .map(chunkMetadata -> (ChunkMetadata) chunkMetadata) + .collect(Collectors.toList()); + } + /** * get all measurements in this file * diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IMetadataQuerier.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IMetadataQuerier.java index 77eb09d..90a1b2a 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IMetadataQuerier.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IMetadataQuerier.java @@ -45,6 +45,8 @@ public interface IMetadataQuerier { */ void loadChunkMetaDatas(List<Path> paths) throws IOException; + void loadChunkMetaDatasV2(List<Path> paths) throws IOException; + /** * @return the corresponding data type. * @throws NoMeasurementException if the measurement not exists. diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java index 98d28b9..1b234f9 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java @@ -22,6 +22,7 @@ import org.apache.iotdb.tsfile.common.cache.LRUCache; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadataV2; import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; @@ -154,6 +155,71 @@ public class MetadataQuerierByFileImpl implements IMetadataQuerier { } } + public void loadChunkMetaDatasV2(List<Path> paths) throws IOException { + // group measurements by device + TreeMap<String, Set<String>> deviceMeasurementsMap = new TreeMap<>(); + for (Path path : paths) { + if (!deviceMeasurementsMap.containsKey(path.getDevice())) { + deviceMeasurementsMap.put(path.getDevice(), new HashSet<>()); + } + deviceMeasurementsMap.get(path.getDevice()).add(path.getMeasurement()); + } + + Map<Path, List<ChunkMetadata>> tempChunkMetaDatas = new HashMap<>(); + + int count = 0; + boolean enough = false; + + for (Map.Entry<String, Set<String>> deviceMeasurements : deviceMeasurementsMap.entrySet()) { + if (enough) { + break; + } + String selectedDevice = deviceMeasurements.getKey(); + // s1, s2, s3 + Set<String> selectedMeasurements = deviceMeasurements.getValue(); + List<String> devices = this.tsFileReader.getAllDevices(); + String[] deviceNames = devices.toArray(new String[0]); + if (Arrays.binarySearch(deviceNames, selectedDevice) < 0) { + continue; + } + + List<TimeseriesMetadataV2> timeseriesMetaDataList = + tsFileReader.readTimeseriesMetadataV2(selectedDevice, selectedMeasurements); + List<ChunkMetadata> chunkMetadataList = new ArrayList<>(); + for (TimeseriesMetadataV2 timeseriesMetadata : timeseriesMetaDataList) { + chunkMetadataList.addAll(tsFileReader.readChunkMetaDataListV2(timeseriesMetadata)); + } + // d1 + for (ChunkMetadata chunkMetaData : chunkMetadataList) { + String currentMeasurement = chunkMetaData.getMeasurementUid(); + + // s1 + if (selectedMeasurements.contains(currentMeasurement)) { + + // d1.s1 + Path path = new Path(selectedDevice, currentMeasurement); + + // add into tempChunkMetaDatas + if (!tempChunkMetaDatas.containsKey(path)) { + tempChunkMetaDatas.put(path, new ArrayList<>()); + } + tempChunkMetaDatas.get(path).add(chunkMetaData); + + // check cache size, stop when reading enough + count++; + if (count == CACHED_ENTRY_NUMBER) { + enough = true; + break; + } + } + } + } + + for (Map.Entry<Path, List<ChunkMetadata>> entry : tempChunkMetaDatas.entrySet()) { + chunkMetaDataCache.put(entry.getKey(), entry.getValue()); + } + } + @Override public TSDataType getDataType(Path path) throws IOException { if (tsFileReader.getChunkMetadataList(path) == null diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/TsFileExecutor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/TsFileExecutor.java index a530cb3..0cac002 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/TsFileExecutor.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/TsFileExecutor.java @@ -67,7 +67,8 @@ public class TsFileExecutor implements QueryExecutor { queryExpression.setSelectSeries(filteredSeriesPath); } - metadataQuerier.loadChunkMetaDatas(queryExpression.getSelectedSeries()); + // metadataQuerier.loadChunkMetaDatas(queryExpression.getSelectedSeries()); + metadataQuerier.loadChunkMetaDatasV2(queryExpression.getSelectedSeries()); if (queryExpression.hasQueryFilter()) { try { IExpression expression = queryExpression.getExpression(); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java index 09b8b66..3ae214e 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java @@ -350,9 +350,15 @@ public class TsFileWriter implements AutoCloseable { */ @Override public void close() throws IOException { + // LOG.info("start close file"); + // flushAllChunkGroups(); + // fileWriter.endFile(); + // } + // + // public void closeV2() throws IOException { LOG.info("start close file"); flushAllChunkGroups(); - fileWriter.endFile(); + fileWriter.endFileV2(); } /** diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java index c402640..17036ff 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java @@ -27,8 +27,10 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor; +import org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructorV2; import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode; import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadataV2; import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -87,6 +89,7 @@ public class TsFileIOWriter { // for upgrade tool Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap; + Map<String, List<TimeseriesMetadataV2>> deviceTimeseriesMetadataV2Map; // the two longs marks the index range of operations in current MemTable // and are serialized after MetaMarker.OPERATION_INDEX_RANGE to recover file-level range @@ -326,6 +329,86 @@ public class TsFileIOWriter { canWrite = false; } + public void endFileV2() throws IOException { + long metaOffset = out.getPosition(); + + // serialize the SEPARATOR of MetaData + ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream()); + + // group ChunkMetadata by series + // only contains ordinary path and time column of vector series + Map<Path, List<IChunkMetadata>> chunkMetadataListMap = new TreeMap<>(); + + // time column -> ChunkMetadataList TreeMap of value columns in vector + Map<Path, Map<Path, List<IChunkMetadata>>> vectorToPathsMap = new HashMap<>(); + + for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) { + List<ChunkMetadata> chunkMetadatas = chunkGroupMetadata.getChunkMetadataList(); + int idx = 0; + while (idx < chunkMetadatas.size()) { + IChunkMetadata chunkMetadata = chunkMetadatas.get(idx); + if (chunkMetadata.getMask() == 0) { + Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid()); + chunkMetadataListMap.computeIfAbsent(series, k -> new ArrayList<>()).add(chunkMetadata); + idx++; + } else if (chunkMetadata.isTimeColumn()) { + // time column of a vector series + Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid()); + chunkMetadataListMap.computeIfAbsent(series, k -> new ArrayList<>()).add(chunkMetadata); + idx++; + Map<Path, List<IChunkMetadata>> chunkMetadataListMapInVector = + vectorToPathsMap.computeIfAbsent(series, key -> new TreeMap<>()); + + // value columns of a vector series + while (idx < chunkMetadatas.size() && chunkMetadatas.get(idx).isValueColumn()) { + chunkMetadata = chunkMetadatas.get(idx); + Path vectorSeries = + new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid()); + chunkMetadataListMapInVector + .computeIfAbsent(vectorSeries, k -> new ArrayList<>()) + .add(chunkMetadata); + idx++; + } + } + } + } + + MetadataIndexNode metadataIndex = flushMetadataIndexV2(chunkMetadataListMap, vectorToPathsMap); + TsFileMetadata tsFileMetaData = new TsFileMetadata(); + tsFileMetaData.setMetadataIndex(metadataIndex); + tsFileMetaData.setMetaOffset(metaOffset); + + long footerIndex = out.getPosition(); + if (logger.isDebugEnabled()) { + logger.debug("start to flush the footer,file pos:{}", footerIndex); + } + + // write TsFileMetaData + int size = tsFileMetaData.serializeTo(out.wrapAsStream()); + if (logger.isDebugEnabled()) { + logger.debug("finish flushing the footer {}, file pos:{}", tsFileMetaData, out.getPosition()); + } + + // write bloom filter + size += tsFileMetaData.serializeBloomFilter(out.wrapAsStream(), chunkMetadataListMap.keySet()); + if (logger.isDebugEnabled()) { + logger.debug("finish flushing the bloom filter file pos:{}", out.getPosition()); + } + + // write TsFileMetaData size + ReadWriteIOUtils.write(size, out.wrapAsStream()); // write the size of the file metadata. + + // write magic string + out.write(MAGIC_STRING_BYTES); + + // close file + out.close(); + if (resourceLogger.isDebugEnabled() && file != null) { + resourceLogger.debug("{} writer is closed.", file.getName()); + } + canWrite = false; + } + /** * Flush TsFileMetadata, including ChunkMetadataList and TimeseriesMetaData * @@ -351,6 +434,23 @@ public class TsFileIOWriter { return MetadataIndexConstructor.constructMetadataIndex(deviceTimeseriesMetadataMap, out); } + private MetadataIndexNode flushMetadataIndexV2( + Map<Path, List<IChunkMetadata>> chunkMetadataListMap, + Map<Path, Map<Path, List<IChunkMetadata>>> vectorToPathsMap) + throws IOException { + + // convert ChunkMetadataList to this field + deviceTimeseriesMetadataV2Map = new LinkedHashMap<>(); + // create device -> TimeseriesMetaDataList Map + for (Map.Entry<Path, List<IChunkMetadata>> entry : chunkMetadataListMap.entrySet()) { + // for ordinary path + flushOneChunkMetadataV2(entry.getKey(), entry.getValue(), vectorToPathsMap); + } + + // construct TsFileMetadata and return + return MetadataIndexConstructorV2.constructMetadataIndex(deviceTimeseriesMetadataV2Map, out); + } + /** * Flush one chunkMetadata * @@ -405,6 +505,50 @@ public class TsFileIOWriter { } } + private void flushOneChunkMetadataV2( + Path path, + List<IChunkMetadata> chunkMetadataList, + Map<Path, Map<Path, List<IChunkMetadata>>> vectorToPathsMap) + throws IOException { + // create TimeseriesMetaData + PublicBAOS publicBAOS = new PublicBAOS(); + TSDataType dataType = chunkMetadataList.get(chunkMetadataList.size() - 1).getDataType(); + + int chunkMetadataListLength = 0; + boolean serializeStatistic = (chunkMetadataList.size() > 1); + // flush chunkMetadataList one by one + for (IChunkMetadata chunkMetadata : chunkMetadataList) { + if (!chunkMetadata.getDataType().equals(dataType)) { + continue; + } + chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS, serializeStatistic); + } + + TimeseriesMetadataV2 timeseriesMetadata = + new TimeseriesMetadataV2( + (byte) + ((serializeStatistic ? (byte) 1 : (byte) 0) | chunkMetadataList.get(0).getMask()), + chunkMetadataListLength, + path.getMeasurement(), + dataType, + publicBAOS); + deviceTimeseriesMetadataV2Map + .computeIfAbsent(path.getDevice(), k -> new ArrayList<>()) + .add(timeseriesMetadata); + + // no VECTOR + // for (IChunkMetadata chunkMetadata : chunkMetadataList) { + // // chunkMetadata is time column of a vector series + // if (chunkMetadata.isTimeColumn()) { + // Map<Path, List<IChunkMetadata>> vectorMap = vectorToPathsMap.get(path); + // + // for (Map.Entry<Path, List<IChunkMetadata>> entry : vectorMap.entrySet()) { + // flushOneChunkMetadataV2(entry.getKey(), entry.getValue(), vectorToPathsMap); + // } + // } + // } + } + /** * get the length of normal OutputStream. *
