This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch tsFile_v4
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/tsFile_v4 by this push:
new 56c7ea61 Add LogicalTableSchema
56c7ea61 is described below
commit 56c7ea612ddee5aca15594f6aba61624fac54628
Author: jt2594838 <[email protected]>
AuthorDate: Sun Apr 7 14:51:30 2024 +0800
Add LogicalTableSchema
---
.../apache/tsfile/file/metadata/ChunkMetadata.java | 16 +++++
.../tsfile/file/metadata/LogicalTableSchema.java | 71 ++++++++++++++++++++++
.../apache/tsfile/file/metadata/TableSchema.java | 58 ++++++++++++++++++
.../apache/tsfile/write/writer/TsFileIOWriter.java | 26 ++++++--
4 files changed, 167 insertions(+), 4 deletions(-)
diff --git
a/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java
index e491d7bb..62d6eede 100644
--- a/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java
@@ -20,11 +20,14 @@
package org.apache.tsfile.file.metadata;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.file.metadata.statistics.Statistics;
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.read.controller.IChunkLoader;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.tsfile.write.schema.MeasurementSchema;
import java.io.IOException;
import java.io.InputStream;
@@ -55,6 +58,10 @@ public class ChunkMetadata implements IChunkMetadata {
private TSDataType tsDataType;
+ // the following two are not serialized and only used during write
+ private TSEncoding encoding;
+ private CompressionType compressionType;
+
/**
* version is used to define the order of operations(insertion, deletion,
update). version is set
* according to its belonging ChunkGroup only when being queried, so it is
not persisted.
@@ -93,6 +100,8 @@ public class ChunkMetadata implements IChunkMetadata {
public ChunkMetadata(
String measurementUid,
TSDataType tsDataType,
+ TSEncoding encoding,
+ CompressionType compressionType,
long fileOffset,
Statistics<? extends Serializable> statistics) {
this.measurementUid = measurementUid;
@@ -111,6 +120,8 @@ public class ChunkMetadata implements IChunkMetadata {
this.isSeq = other.isSeq;
this.isClosed = other.isClosed;
this.mask = other.mask;
+ this.encoding = other.encoding;
+ this.compressionType = other.compressionType;
}
@Override
@@ -384,4 +395,9 @@ public class ChunkMetadata implements IChunkMetadata {
public boolean hasNullValue(int measurementIndex) {
return false;
}
+
+ // TODO: replaces fields in this class with MeasurementSchema
+ public MeasurementSchema toMeasurementSchema() {
+ return new MeasurementSchema(measurementUid, tsDataType, encoding,
compressionType);
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/file/metadata/LogicalTableSchema.java
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/LogicalTableSchema.java
new file mode 100644
index 00000000..e2ccb9fe
--- /dev/null
+++
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/LogicalTableSchema.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.file.metadata;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * TableSchema for devices with path-based DeviceIds. It generates the Id
columns based on the max
+ * level of paths.
+ */
+public class LogicalTableSchema extends TableSchema {
+
+ private int maxLevel;
+
+ public LogicalTableSchema(String tableName) {
+ super(tableName);
+ }
+
+ @Override
+ public void update(ChunkGroupMetadata chunkGroupMetadata) {
+ super.update(chunkGroupMetadata);
+ this.maxLevel = Math.max(this.maxLevel,
chunkGroupMetadata.getDevice().segmentNum());
+ }
+
+ private List<MeasurementSchema> generateIdColumns() {
+ List<MeasurementSchema> generatedIdColumns = new ArrayList<>();
+ for (int i = 0; i < maxLevel; i++) {
+ generatedIdColumns.add(
+ new MeasurementSchema(
+ "__level" + i, TSDataType.TEXT, TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED));
+ }
+ return generatedIdColumns;
+ }
+
+ /** Once called, the schema is no longer updatable. */
+ @Override
+ public List<MeasurementSchema> getColumnSchemas() {
+ if (!updatable) {
+ return columnSchemas;
+ }
+
+ List<MeasurementSchema> allColumns = new ArrayList<>(generateIdColumns());
+ allColumns.addAll(columnSchemas);
+ columnSchemas = allColumns;
+ updatable = false;
+ return allColumns;
+ }
+}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
index 3ad644fe..0741fd55 100644
--- a/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
+++ b/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
@@ -21,9 +21,67 @@ package org.apache.tsfile.file.metadata;
import org.apache.tsfile.write.schema.MeasurementSchema;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class TableSchema {
protected String tableName;
protected List<MeasurementSchema> columnSchemas;
+ protected boolean updatable = false;
+
+ // columnName -> pos in columnSchemas;
+ private Map<String, Integer> columnPosIndex;
+
+ public TableSchema(String tableName) {
+ this.tableName = tableName;
+ this.columnSchemas = new ArrayList<>();
+ this.updatable = true;
+ }
+
+ public TableSchema(String tableName, List<MeasurementSchema> columnSchemas) {
+ this.tableName = tableName;
+ this.columnSchemas = columnSchemas;
+ }
+
+ public Map<String, Integer> getColumnPosIndex() {
+ if (columnPosIndex == null) {
+ columnPosIndex = new HashMap<>();
+ }
+ return columnPosIndex;
+ }
+
+ public int findColumnIndex(String columnName) {
+ return getColumnPosIndex()
+ .computeIfAbsent(
+ columnName,
+ colName -> {
+ for (int i = 0; i < columnSchemas.size(); i++) {
+ if
(columnSchemas.get(i).getMeasurementId().equals(columnName)) {
+ return i;
+ }
+ }
+ return -1;
+ });
+ }
+
+ public void update(ChunkGroupMetadata chunkGroupMetadata) {
+ if (!updatable) {
+ return;
+ }
+
+ for (ChunkMetadata chunkMetadata :
chunkGroupMetadata.getChunkMetadataList()) {
+ int columnIndex = findColumnIndex(chunkMetadata.getMeasurementUid());
+ // if the measurement is not found in the column list, add it
+ if (columnIndex == -1) {
+ columnSchemas.add(chunkMetadata.toMeasurementSchema());
+ getColumnPosIndex().put(chunkMetadata.getMeasurementUid(),
columnSchemas.size() - 1);
+ }
+ }
+ }
+
+ public List<MeasurementSchema> getColumnSchemas() {
+ return columnSchemas;
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
b/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
index 23b3d5ac..08453387 100644
--- a/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
@@ -193,8 +193,10 @@ public class TsFileIOWriter implements AutoCloseable {
if (currentChunkGroupDeviceId == null || chunkMetadataList.isEmpty()) {
return;
}
- chunkGroupMetadataList.add(
- new ChunkGroupMetadata(currentChunkGroupDeviceId, chunkMetadataList));
+
+ ChunkGroupMetadata chunkGroupMetadata =
+ new ChunkGroupMetadata(currentChunkGroupDeviceId, chunkMetadataList);
+ chunkGroupMetadataList.add(chunkGroupMetadata);
currentChunkGroupDeviceId = null;
chunkMetadataList = null;
out.flush();
@@ -233,7 +235,13 @@ public class TsFileIOWriter implements AutoCloseable {
throws IOException {
currentChunkMetadata =
- new ChunkMetadata(measurementId, tsDataType, out.getPosition(),
statistics);
+ new ChunkMetadata(
+ measurementId,
+ tsDataType,
+ encodingType,
+ compressionCodecName,
+ out.getPosition(),
+ statistics);
currentChunkMetadata.setMask((byte) mask);
ChunkHeader header =
@@ -255,6 +263,8 @@ public class TsFileIOWriter implements AutoCloseable {
new ChunkMetadata(
chunkHeader.getMeasurementID(),
chunkHeader.getDataType(),
+ chunkHeader.getEncodingType(),
+ chunkHeader.getCompressionType(),
out.getPosition(),
chunkMetadata.getStatistics());
chunkHeader.serializeTo(out.wrapAsStream());
@@ -277,7 +287,13 @@ public class TsFileIOWriter implements AutoCloseable {
Statistics<? extends Serializable> statistics)
throws IOException {
currentChunkMetadata =
- new ChunkMetadata(measurementId, tsDataType, out.getPosition(),
statistics);
+ new ChunkMetadata(
+ measurementId,
+ tsDataType,
+ encodingType,
+ compressionType,
+ out.getPosition(),
+ statistics);
currentChunkMetadata.setMask(TsFileConstant.VALUE_COLUMN_MASK);
ChunkHeader emptyChunkHeader =
new ChunkHeader(
@@ -298,6 +314,8 @@ public class TsFileIOWriter implements AutoCloseable {
new ChunkMetadata(
chunkHeader.getMeasurementID(),
chunkHeader.getDataType(),
+ chunkHeader.getEncodingType(),
+ chunkHeader.getCompressionType(),
out.getPosition(),
chunk.getChunkStatistic());
chunkHeader.serializeTo(out.wrapAsStream());