This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9d6b811cca0 Add some tsfile-related tools (#14766)
9d6b811cca0 is described below
commit 9d6b811cca0cc4d4c398ead281b36924a1f1fa58
Author: Jiang Tian <[email protected]>
AuthorDate: Fri Feb 7 15:34:25 2025 +0800
Add some tsfile-related tools (#14766)
* add some tsfile-related tools
* Add license
* spotless
* end the last chunk group
* use TableChunkReader to retain nulls
---
.../utils/TsFileRewriteOverPrecisedI64Scan.java | 223 +++++++++++++++++
.../utils/TsFileRewriteSmallRangeI64Scan.java | 230 ++++++++++++++++++
.../iotdb/db/tools/utils/TsFileSequenceScan.java | 34 ++-
.../iotdb/db/tools/utils/TsFileStatisticScan.java | 266 +++++++++++++++++++++
4 files changed, 745 insertions(+), 8 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/utils/TsFileRewriteOverPrecisedI64Scan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/utils/TsFileRewriteOverPrecisedI64Scan.java
new file mode 100644
index 00000000000..a9357090534
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/utils/TsFileRewriteOverPrecisedI64Scan.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.tools.utils;
+
+import org.apache.tsfile.encoding.encoder.TSEncodingBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.header.ChunkHeader;
+import org.apache.tsfile.file.header.PageHeader;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.common.BatchData;
+import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.read.reader.IPageReader;
+import org.apache.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.tsfile.read.reader.chunk.TableChunkReader;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.TsPrimitiveType;
+import org.apache.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.tsfile.write.chunk.ValueChunkWriter;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+public class TsFileRewriteOverPrecisedI64Scan extends TsFileSequenceScan {
+
+ private final File target;
+ private TsFileIOWriter writer;
+ private Chunk currTimeChunk;
+
+ public TsFileRewriteOverPrecisedI64Scan(File target) {
+ this.target = target;
+ }
+
+ public static void main(String[] args) {
+ File sourceFile = new File(args[0]);
+ File targetFile = new File(args[1]);
+ TsFileRewriteOverPrecisedI64Scan scan = new
TsFileRewriteOverPrecisedI64Scan(targetFile);
+ scan.scanTsFile(sourceFile);
+
+ long sourceLength = sourceFile.length();
+ long targetLength = targetFile.length();
+ System.out.printf(
+ "Before rewrite %d, after rewrite %d, ratio %f%n",
+ sourceLength, targetLength, sourceLength * 1.0 / targetLength);
+ }
+
+ @Override
+ protected boolean onFileOpen(File file) throws IOException {
+ boolean shouldScan = super.onFileOpen(file);
+ if (shouldScan) {
+ writer = new TsFileIOWriter(target);
+ }
+ return shouldScan;
+ }
+
+ @Override
+ protected void onFileEnd() throws IOException {
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+
+ @Override
+ protected void onChunkGroup() throws IOException {
+ if (currDeviceID != null) {
+ writer.endChunkGroup();
+ }
+ super.onChunkGroup();
+ writer.startChunkGroup(currDeviceID);
+ }
+
+ @Override
+ protected void onChunk(PageVisitor pageVisitor) throws IOException {
+ reader.position(reader.position() - 1);
+ Chunk chunk = reader.readMemChunk(reader.position());
+ chunk =
+ new Chunk(
+ chunk.getHeader(),
+ chunk.getData(),
+ Collections.emptyList(),
+ Statistics.getStatsByType(chunk.getHeader().getDataType()));
+ currMeasurementID = chunk.getHeader().getMeasurementID();
+ currTimeseriesID = new Pair<>(currDeviceID, currMeasurementID);
+ if (!currDeviceAligned) {
+ onNonAlignedChunk(chunk);
+ } else {
+ onAlignedChunk(chunk);
+ }
+ System.out.println("Processed a chunk of " + currDeviceID + "." +
currMeasurementID);
+ reader.position(
+ reader.position()
+ + chunk.getHeader().getSerializedSize()
+ + chunk.getHeader().getDataSize());
+ }
+
+ private void onNonAlignedChunk(Chunk chunk) throws IOException {
+ if (chunk.getHeader().getDataType() != TSDataType.INT64) {
+ writer.writeChunk(chunk);
+ } else {
+ if (!rewriteInt64ChunkNonAligned(chunk)) {
+ writer.writeChunk(chunk);
+ }
+ }
+ }
+
+ private void onAlignedChunk(Chunk chunk) throws IOException {
+ if (isTimeChunk || chunk.getHeader().getDataType() != TSDataType.INT64) {
+ writer.writeChunk(chunk);
+ if (isTimeChunk) {
+ currTimeChunk = chunk;
+ }
+ } else {
+ if (!rewriteInt64ChunkAligned(chunk)) {
+ writer.writeChunk(chunk);
+ }
+ }
+ }
+
+ private boolean rewriteInt64ChunkNonAligned(Chunk chunk) throws IOException {
+ ChunkReader chunkReader = new ChunkReader(chunk);
+ ChunkHeader header = chunk.getHeader();
+ List<IPageReader> pageReaders = chunkReader.loadPageReaderList();
+ ChunkWriterImpl chunkWriter =
+ new ChunkWriterImpl(
+ new MeasurementSchema(
+ header.getMeasurementID(),
+ TSDataType.INT32,
+ header.getEncodingType(),
+ header.getCompressionType()));
+
+ for (IPageReader pageReader : pageReaders) {
+ BatchData batchData = pageReader.getAllSatisfiedPageData();
+ while (batchData.hasCurrent()) {
+ int intVal = (int) batchData.getLong();
+ if (intVal != batchData.getLong()) {
+ // the chunk is not over-precised
+ return false;
+ }
+ chunkWriter.write(batchData.currentTime(), (int) batchData.getLong());
+ }
+ chunkWriter.sealCurrentPage();
+ }
+ chunkWriter.writeToFileWriter(writer);
+ return true;
+ }
+
+ private boolean rewriteInt64ChunkAligned(Chunk chunk) throws IOException {
+ // use TableChunkReader so that nulls will not be skipped
+ TableChunkReader chunkReader =
+ new TableChunkReader(currTimeChunk, Collections.singletonList(chunk),
null);
+ ChunkHeader header = chunk.getHeader();
+ List<IPageReader> pageReaders = chunkReader.loadPageReaderList();
+ ValueChunkWriter valueChunkWriter =
+ new ValueChunkWriter(
+ header.getMeasurementID(),
+ header.getCompressionType(),
+ TSDataType.INT32,
+ header.getEncodingType(),
+ TSEncodingBuilder.getEncodingBuilder(header.getEncodingType())
+ .getEncoder(TSDataType.INT32));
+
+ for (IPageReader pageReader : pageReaders) {
+ BatchData batchData = pageReader.getAllSatisfiedPageData();
+ while (batchData.hasCurrent()) {
+ TsPrimitiveType[] vector = batchData.getVector();
+ boolean isNull = vector[0] == null;
+ if (!isNull) {
+ int intVal = (int) vector[0].getLong();
+ if (intVal != vector[0].getLong()) {
+ // the chunk is not over-precised
+ return false;
+ }
+ valueChunkWriter.write(batchData.currentTime(), intVal, false);
+ } else {
+ valueChunkWriter.write(batchData.currentTime(), 0, true);
+ }
+ }
+ valueChunkWriter.sealCurrentPage();
+ }
+ valueChunkWriter.writeToFileWriter(writer);
+ return true;
+ }
+
+ @Override
+ protected void onTimePage(PageHeader pageHeader, ByteBuffer pageData,
ChunkHeader chunkHeader) {
+ // do nothing
+ }
+
+ @Override
+ protected void onValuePage(PageHeader pageHeader, ByteBuffer pageData,
ChunkHeader chunkHeader) {
+ // do nothing
+ }
+
+ @Override
+ protected void onNonAlignedPage(
+ PageHeader pageHeader, ByteBuffer pageData, ChunkHeader chunkHeader) {
+ // do nothing
+ }
+
+ @Override
+ protected void onException(Throwable t) {
+ throw new RuntimeException(t);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/utils/TsFileRewriteSmallRangeI64Scan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/utils/TsFileRewriteSmallRangeI64Scan.java
new file mode 100644
index 00000000000..360fc1c0576
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/utils/TsFileRewriteSmallRangeI64Scan.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.tools.utils;
+
+import org.apache.tsfile.encoding.encoder.TSEncodingBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.header.ChunkHeader;
+import org.apache.tsfile.file.header.PageHeader;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.common.BatchData;
+import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.read.reader.IPageReader;
+import org.apache.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.tsfile.read.reader.chunk.TableChunkReader;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.TsPrimitiveType;
+import org.apache.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.tsfile.write.chunk.ValueChunkWriter;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+public class TsFileRewriteSmallRangeI64Scan extends TsFileSequenceScan {
+
+ private final File target;
+ private TsFileIOWriter writer;
+ private Chunk currTimeChunk;
+
+ public TsFileRewriteSmallRangeI64Scan(File target) {
+ this.target = target;
+ }
+
+ public static void main(String[] args) throws IOException {
+ File sourceFile = new File(args[0]);
+ File targetFile = new File(args[1]);
+ TsFileRewriteSmallRangeI64Scan scan = new
TsFileRewriteSmallRangeI64Scan(targetFile);
+ scan.scanTsFile(sourceFile);
+
+ long sourceLength = sourceFile.length();
+ long targetLength = targetFile.length();
+ System.out.printf(
+ "Before rewrite %d, after rewrite %d, ratio %f%n",
+ sourceLength, targetLength, sourceLength * 1.0 / targetLength);
+ }
+
+ @Override
+ protected boolean onFileOpen(File file) throws IOException {
+ boolean shouldScan = super.onFileOpen(file);
+ if (shouldScan) {
+ writer = new TsFileIOWriter(target);
+ }
+ return shouldScan;
+ }
+
+ @Override
+ protected void onFileEnd() throws IOException {
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+
+ @Override
+ protected void onChunkGroup() throws IOException {
+ if (currDeviceID != null) {
+ writer.endChunkGroup();
+ }
+ super.onChunkGroup();
+ writer.startChunkGroup(currDeviceID);
+ }
+
+ @Override
+ protected void onChunk(PageVisitor pageVisitor) throws IOException {
+ reader.position(reader.position() - 1);
+ Chunk chunk = reader.readMemChunk(reader.position());
+ chunk =
+ new Chunk(
+ chunk.getHeader(),
+ chunk.getData(),
+ Collections.emptyList(),
+ Statistics.getStatsByType(chunk.getHeader().getDataType()));
+ currMeasurementID = chunk.getHeader().getMeasurementID();
+ currTimeseriesID = new Pair<>(currDeviceID, currMeasurementID);
+ if (!currDeviceAligned) {
+ onNonAlignedChunk(chunk);
+ } else {
+ onAlignedChunk(chunk);
+ }
+ System.out.println("Processed a chunk of " + currDeviceID + "." +
currMeasurementID);
+ reader.position(
+ reader.position()
+ + chunk.getHeader().getSerializedSize()
+ + chunk.getHeader().getDataSize());
+ }
+
+ private void onNonAlignedChunk(Chunk chunk) throws IOException {
+ if (chunk.getHeader().getDataType() != TSDataType.INT64) {
+ writer.writeChunk(chunk);
+ } else {
+ if (!rewriteInt64ChunkNonAligned(chunk)) {
+ writer.writeChunk(chunk);
+ }
+ }
+ }
+
+ private void onAlignedChunk(Chunk chunk) throws IOException {
+ if (isTimeChunk || chunk.getHeader().getDataType() != TSDataType.INT64) {
+ writer.writeChunk(chunk);
+ if (isTimeChunk) {
+ currTimeChunk = chunk;
+ }
+ } else {
+ if (!rewriteInt64ChunkAligned(chunk)) {
+ writer.writeChunk(chunk);
+ }
+ }
+ }
+
+ private boolean rewriteInt64ChunkNonAligned(Chunk chunk) throws IOException {
+ ChunkReader chunkReader = new ChunkReader(chunk);
+ ChunkHeader header = chunk.getHeader();
+ List<IPageReader> pageReaders = chunkReader.loadPageReaderList();
+ ChunkWriterImpl chunkWriter =
+ new ChunkWriterImpl(
+ new MeasurementSchema(
+ header.getMeasurementID(),
+ TSDataType.INT32,
+ header.getEncodingType(),
+ header.getCompressionType()));
+
+ Long firstVal = null;
+ for (IPageReader pageReader : pageReaders) {
+ BatchData batchData = pageReader.getAllSatisfiedPageData();
+ while (batchData.hasCurrent()) {
+ if (firstVal == null) {
+ firstVal = batchData.getLong();
+ }
+ long diff = batchData.getLong() - firstVal;
+ if (diff > Integer.MAX_VALUE) {
+ return false;
+ }
+ chunkWriter.write(batchData.currentTime(), (int) diff);
+ batchData.next();
+ }
+ chunkWriter.sealCurrentPage();
+ }
+ chunkWriter.writeToFileWriter(writer);
+ return true;
+ }
+
+ private boolean rewriteInt64ChunkAligned(Chunk chunk) throws IOException {
+ // use TableChunkReader so that nulls will not be skipped
+ TableChunkReader chunkReader =
+ new TableChunkReader(currTimeChunk, Collections.singletonList(chunk),
null);
+ ChunkHeader header = chunk.getHeader();
+ List<IPageReader> pageReaders = chunkReader.loadPageReaderList();
+ ValueChunkWriter valueChunkWriter =
+ new ValueChunkWriter(
+ header.getMeasurementID(),
+ header.getCompressionType(),
+ TSDataType.INT32,
+ header.getEncodingType(),
+ TSEncodingBuilder.getEncodingBuilder(header.getEncodingType())
+ .getEncoder(TSDataType.INT32));
+
+ Long firstVal = null;
+ for (IPageReader pageReader : pageReaders) {
+ BatchData batchData = pageReader.getAllSatisfiedPageData();
+ while (batchData.hasCurrent()) {
+ TsPrimitiveType[] vector = batchData.getVector();
+ boolean isNull = vector[0] == null;
+ if (!isNull) {
+ if (firstVal == null) {
+ firstVal = vector[0].getLong();
+ }
+ long diff = vector[0].getLong() - firstVal;
+ if (diff > Integer.MAX_VALUE) {
+ return false;
+ }
+ valueChunkWriter.write(batchData.currentTime(), (int) diff, false);
+ } else {
+ valueChunkWriter.write(batchData.currentTime(), 0, true);
+ }
+ }
+ valueChunkWriter.sealCurrentPage();
+ }
+ valueChunkWriter.writeToFileWriter(writer);
+ return true;
+ }
+
+ @Override
+ protected void onTimePage(PageHeader pageHeader, ByteBuffer pageData,
ChunkHeader chunkHeader) {
+ // do nothing
+ }
+
+ @Override
+ protected void onValuePage(PageHeader pageHeader, ByteBuffer pageData,
ChunkHeader chunkHeader) {
+ // do nothing
+ }
+
+ @Override
+ protected void onNonAlignedPage(
+ PageHeader pageHeader, ByteBuffer pageData, ChunkHeader chunkHeader) {
+ // do nothing
+ }
+
+ @Override
+ protected void onException(Throwable t) {
+ throw new RuntimeException(t);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/utils/TsFileSequenceScan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/utils/TsFileSequenceScan.java
index d9ea636948f..eddb7a0e80c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/utils/TsFileSequenceScan.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/utils/TsFileSequenceScan.java
@@ -44,6 +44,9 @@ public abstract class TsFileSequenceScan {
protected String currMeasurementID;
protected Pair<IDeviceID, String> currTimeseriesID;
protected boolean currChunkOnePage;
+ protected ChunkHeader currChunkHeader;
+ protected boolean currDeviceAligned;
+ protected boolean isTimeChunk;
public TsFileSequenceScan() {}
@@ -60,23 +63,26 @@ public abstract class TsFileSequenceScan {
protected void onFileEnd() throws IOException {}
protected void onChunk(PageVisitor pageVisitor) throws IOException {
- ChunkHeader chunkHeader = reader.readChunkHeader(marker);
- if (chunkHeader.getDataSize() == 0) {
+ currChunkHeader = reader.readChunkHeader(marker);
+ if (currChunkHeader.getDataSize() == 0) {
// empty value chunk
return;
}
- currMeasurementID = chunkHeader.getMeasurementID();
+ currMeasurementID = currChunkHeader.getMeasurementID();
currTimeseriesID = new Pair<>(currDeviceID, currMeasurementID);
- int dataSize = chunkHeader.getDataSize();
+ int dataSize = currChunkHeader.getDataSize();
+ onChunkData(pageVisitor, dataSize);
+ }
+ protected void onChunkData(PageVisitor pageVisitor, int dataSize) throws
IOException {
while (dataSize > 0) {
PageHeader pageHeader =
reader.readPageHeader(
- chunkHeader.getDataType(),
- (chunkHeader.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER);
- ByteBuffer pageData = reader.readPage(pageHeader,
chunkHeader.getCompressionType());
- pageVisitor.onPage(pageHeader, pageData, chunkHeader);
+ currChunkHeader.getDataType(),
+ (currChunkHeader.getChunkType() & 0x3F) ==
MetaMarker.CHUNK_HEADER);
+ ByteBuffer pageData = reader.readPage(pageHeader,
currChunkHeader.getCompressionType());
+ pageVisitor.onPage(pageHeader, pageData, currChunkHeader);
dataSize -= pageHeader.getSerializedPageSize();
}
}
@@ -123,26 +129,38 @@ public abstract class TsFileSequenceScan {
switch (marker) {
case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
currChunkOnePage = true;
+ currDeviceAligned = false;
+ isTimeChunk = false;
onChunk(this::onNonAlignedPage);
break;
case MetaMarker.CHUNK_HEADER:
currChunkOnePage = false;
+ currDeviceAligned = false;
+ isTimeChunk = false;
onChunk(this::onNonAlignedPage);
break;
case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
currChunkOnePage = true;
+ currDeviceAligned = true;
+ isTimeChunk = true;
onChunk(this::onTimePage);
break;
case MetaMarker.TIME_CHUNK_HEADER:
currChunkOnePage = false;
+ currDeviceAligned = true;
+ isTimeChunk = true;
onChunk(this::onTimePage);
break;
case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
currChunkOnePage = true;
+ currDeviceAligned = true;
+ isTimeChunk = false;
onChunk(this::onValuePage);
break;
case MetaMarker.VALUE_CHUNK_HEADER:
currChunkOnePage = false;
+ currDeviceAligned = true;
+ isTimeChunk = false;
onChunk(this::onValuePage);
break;
case MetaMarker.CHUNK_GROUP_HEADER:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/utils/TsFileStatisticScan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/utils/TsFileStatisticScan.java
new file mode 100644
index 00000000000..094f12dcc84
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/utils/TsFileStatisticScan.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.tools.utils;
+
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.encoding.decoder.Decoder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.header.ChunkHeader;
+import org.apache.tsfile.file.header.PageHeader;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.common.BatchData;
+import org.apache.tsfile.read.reader.page.AlignedPageReader;
+import org.apache.tsfile.read.reader.page.PageReader;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.TsPrimitiveType;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TsFileStatisticScan extends TsFileSequenceScan {
+
+ // (deviceId, measurementId) -> data type
+ private final Map<Pair<IDeviceID, String>, TSDataType> seriesDataTypeMap =
new HashMap<>();
+ private final Map<TSDataType, Long> dataTypeSizeMap = new
EnumMap<>(TSDataType.class);
+ private final Map<TSDataType, Long> dataTypePointMap = new
EnumMap<>(TSDataType.class);
+ private final Map<TSDataType, Long> dataTypeChunkMap = new
EnumMap<>(TSDataType.class);
+ private final List<Integer> distinctBinaryValueNumInChunks = new
LinkedList<>();
+ // the number of Int64 chunks that can be represented by Int32
+ private int overPrecisedInt64ChunkNum;
+ // the number of Int64 chunks that cannot be represented by Int32
+ private int justPrecisedInt64ChunkNum;
+ // the number of Int64 chunks whose range can be represented by Int32
+ private int smallRangeInt64ChunkNum;
+ // the number of Int64 chunks whose range cannot be represented by Int32
+ private int largeRangeInt64ChunkNum;
+ private boolean currChunkJustPrecised;
+ private boolean currChunkLargeRange;
+ private Set<Binary> distinctBinarySet = new HashSet<>();
+ private PageHeader currTimePageHeader;
+ private ByteBuffer currTimePageBuffer;
+
+ public static void main(String[] args) {
+ TsFileStatisticScan t = new TsFileStatisticScan();
+ t.scanTsFile(new File(args[0]));
+ }
+
+ @Override
+ protected void onFileEnd() throws IOException {
+ super.onFileEnd();
+ Map<TSDataType, Integer> seriesTypeCountMap = new
EnumMap<>(TSDataType.class);
+ for (TSDataType type : seriesDataTypeMap.values()) {
+ seriesTypeCountMap.compute(type, (t, v) -> v == null ? 1 : v + 1);
+ }
+ System.out.println("Series data type count: " + seriesTypeCountMap);
+ System.out.println(
+ "Int64 series statistics: overPrecised "
+ + overPrecisedInt64ChunkNum
+ + ", justPrecised "
+ + justPrecisedInt64ChunkNum
+ + ", smallRange "
+ + smallRangeInt64ChunkNum
+ + ", largeRange "
+ + largeRangeInt64ChunkNum);
+ System.out.println("data type -> size: " + dataTypeSizeMap);
+ System.out.println("data type -> point count: " + dataTypePointMap);
+ System.out.println("data type -> chunk count: " + dataTypeChunkMap);
+ System.out.println(
+ "average distinct binary value num: "
+ + distinctBinaryValueNumInChunks.stream().mapToInt(i ->
i).average().orElse(0.0));
+ }
+
+ @Override
+ protected void onChunk(PageVisitor pageVisitor) throws IOException {
+ currChunkJustPrecised = false;
+ currChunkLargeRange = false;
+
+ super.onChunk(pageVisitor);
+ if (!isTimeChunk) {
+ seriesDataTypeMap.computeIfAbsent(currTimeseriesID, cid ->
currChunkHeader.getDataType());
+ dataTypeSizeMap.compute(
+ currChunkHeader.getDataType(),
+ (type, size) ->
+ size == null
+ ? currChunkHeader.getSerializedSize() +
currChunkHeader.getDataSize()
+ : size + currChunkHeader.getSerializedSize() +
currChunkHeader.getDataSize());
+ dataTypeChunkMap.compute(
+ currChunkHeader.getDataType(), (type, size) -> size == null ? 1 :
size + 1);
+ }
+ if (currChunkHeader.getDataType() == TSDataType.INT64) {
+ if (currChunkJustPrecised) {
+ justPrecisedInt64ChunkNum++;
+ } else {
+ overPrecisedInt64ChunkNum++;
+ }
+
+ if (currChunkLargeRange) {
+ largeRangeInt64ChunkNum++;
+ } else {
+ smallRangeInt64ChunkNum++;
+ }
+ } else if (currChunkHeader.getDataType() == TSDataType.TEXT
+ || currChunkHeader.getDataType() == TSDataType.STRING) {
+ distinctBinaryValueNumInChunks.add(distinctBinarySet.size());
+ distinctBinarySet.clear();
+ }
+ }
+
+ @Override
+ protected void onTimePage(PageHeader pageHeader, ByteBuffer pageData,
ChunkHeader chunkHeader)
+ throws IOException {
+ currTimePageHeader = pageHeader;
+ currTimePageBuffer = pageData;
+ }
+
+ @Override
+ protected void onValuePage(PageHeader pageHeader, ByteBuffer pageData,
ChunkHeader chunkHeader)
+ throws IOException {
+ TSDataType dataType = chunkHeader.getDataType();
+ if (dataType == TSDataType.INT64) {
+ Statistics<Long> statistics = (Statistics<Long>)
pageHeader.getStatistics();
+ Long minValue = statistics.getMinValue();
+ Long maxValue = statistics.getMaxValue();
+ if (minValue < Integer.MIN_VALUE || maxValue > Integer.MAX_VALUE) {
+ currChunkJustPrecised = true;
+ }
+ if (maxValue - minValue > Integer.MAX_VALUE) {
+ currChunkLargeRange = true;
+ }
+ } else if (dataType == TSDataType.TEXT || dataType == TSDataType.STRING) {
+ AlignedPageReader pageReader =
+ new AlignedPageReader(
+ currTimePageHeader,
+ currTimePageBuffer,
+ Decoder.getDecoderByType(
+
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+ TSDataType.INT64),
+ Collections.singletonList(pageHeader),
+ Collections.singletonList(pageData),
+ Collections.singletonList(dataType),
+ Collections.singletonList(
+ Decoder.getDecoderByType(chunkHeader.getEncodingType(),
dataType)),
+ null);
+ BatchData batchData = pageReader.getAllSatisfiedPageData();
+ while (batchData.hasCurrent()) {
+ TsPrimitiveType[] vector = batchData.getVector();
+ if (vector[0] != null) {
+ distinctBinarySet.add(vector[0].getBinary());
+ }
+ batchData.next();
+ }
+ }
+ dataTypePointMap.compute(
+ currChunkHeader.getDataType(),
+ (type, size) ->
+ size == null
+ ? pageHeader.getStatistics().getCount()
+ : size + pageHeader.getStatistics().getCount());
+ }
+
+ @Override
+ protected void onNonAlignedPage(
+ PageHeader pageHeader, ByteBuffer pageData, ChunkHeader chunkHeader)
throws IOException {
+ TSDataType dataType = chunkHeader.getDataType();
+ if (pageHeader.getStatistics() != null) {
+ if (dataType == TSDataType.INT64) {
+ Statistics<Long> statistics = (Statistics<Long>)
pageHeader.getStatistics();
+ Long minValue = statistics.getMinValue();
+ Long maxValue = statistics.getMaxValue();
+ if (minValue < Integer.MIN_VALUE || maxValue > Integer.MAX_VALUE) {
+ currChunkJustPrecised = true;
+ }
+ if (maxValue - minValue > Integer.MAX_VALUE) {
+ currChunkLargeRange = true;
+ }
+ } else if (dataType == TSDataType.TEXT || dataType == TSDataType.STRING)
{
+ PageReader pageReader =
+ new PageReader(
+ pageHeader,
+ pageData,
+ chunkHeader.getDataType(),
+ Decoder.getDecoderByType(chunkHeader.getEncodingType(),
chunkHeader.getDataType()),
+ Decoder.getDecoderByType(
+
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+ TSDataType.INT64));
+ BatchData allSatisfiedPageData =
pageReader.getAllSatisfiedPageData(true);
+ while (allSatisfiedPageData.hasCurrent()) {
+ distinctBinarySet.add(allSatisfiedPageData.getBinary());
+ allSatisfiedPageData.next();
+ }
+ }
+ dataTypePointMap.compute(
+ currChunkHeader.getDataType(),
+ (type, size) ->
+ size == null
+ ? pageHeader.getStatistics().getCount()
+ : size + pageHeader.getStatistics().getCount());
+ } else {
+ PageReader pageReader =
+ new PageReader(
+ pageHeader,
+ pageData,
+ chunkHeader.getDataType(),
+ Decoder.getDecoderByType(chunkHeader.getEncodingType(),
chunkHeader.getDataType()),
+ Decoder.getDecoderByType(
+
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+ TSDataType.INT64));
+ BatchData allSatisfiedPageData =
pageReader.getAllSatisfiedPageData(true);
+ long minLongValue = Long.MAX_VALUE;
+ long maxLongValue = Long.MIN_VALUE;
+ int cnt = 0;
+ while (allSatisfiedPageData.hasCurrent()) {
+ if (dataType == TSDataType.INT64) {
+ long val = (long) allSatisfiedPageData.currentValue();
+ if (val < Integer.MIN_VALUE || val > Integer.MAX_VALUE) {
+ currChunkJustPrecised = true;
+ }
+ minLongValue = Math.min(minLongValue, val);
+ maxLongValue = Math.max(maxLongValue, val);
+ } else if (dataType == TSDataType.TEXT || dataType ==
TSDataType.STRING) {
+ distinctBinarySet.add(allSatisfiedPageData.getBinary());
+ }
+ cnt++;
+ allSatisfiedPageData.next();
+ }
+ int finalCnt = cnt;
+ dataTypePointMap.compute(
+ currChunkHeader.getDataType(), (type, size) -> size == null ?
finalCnt : size + finalCnt);
+ if (maxLongValue - minLongValue > Integer.MAX_VALUE) {
+ currChunkLargeRange = true;
+ }
+ }
+ }
+
+ @Override
+ protected void onException(Throwable t) {
+ t.printStackTrace();
+ }
+}