This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch tsFile_v4
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/tsFile_v4 by this push:
     new 56c7ea61 Add LogicalTableSchema
56c7ea61 is described below

commit 56c7ea612ddee5aca15594f6aba61624fac54628
Author: jt2594838 <[email protected]>
AuthorDate: Sun Apr 7 14:51:30 2024 +0800

    Add LogicalTableSchema
---
 .../apache/tsfile/file/metadata/ChunkMetadata.java | 16 +++++
 .../tsfile/file/metadata/LogicalTableSchema.java   | 71 ++++++++++++++++++++++
 .../apache/tsfile/file/metadata/TableSchema.java   | 58 ++++++++++++++++++
 .../apache/tsfile/write/writer/TsFileIOWriter.java | 26 ++++++--
 4 files changed, 167 insertions(+), 4 deletions(-)

diff --git 
a/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java 
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java
index e491d7bb..62d6eede 100644
--- a/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java
@@ -20,11 +20,14 @@
 package org.apache.tsfile.file.metadata;
 
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.file.metadata.statistics.Statistics;
 import org.apache.tsfile.read.common.TimeRange;
 import org.apache.tsfile.read.controller.IChunkLoader;
 import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.tsfile.write.schema.MeasurementSchema;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -55,6 +58,10 @@ public class ChunkMetadata implements IChunkMetadata {
 
   private TSDataType tsDataType;
 
+  // the following two are not serialized and only used during write
+  private TSEncoding encoding;
+  private CompressionType compressionType;
+
   /**
    * version is used to define the order of operations(insertion, deletion, 
update). version is set
    * according to its belonging ChunkGroup only when being queried, so it is 
not persisted.
@@ -93,6 +100,8 @@ public class ChunkMetadata implements IChunkMetadata {
   public ChunkMetadata(
       String measurementUid,
       TSDataType tsDataType,
+      TSEncoding encoding,
+      CompressionType compressionType,
       long fileOffset,
       Statistics<? extends Serializable> statistics) {
     this.measurementUid = measurementUid;
@@ -111,6 +120,8 @@ public class ChunkMetadata implements IChunkMetadata {
     this.isSeq = other.isSeq;
     this.isClosed = other.isClosed;
     this.mask = other.mask;
+    this.encoding = other.encoding;
+    this.compressionType = other.compressionType;
   }
 
   @Override
@@ -384,4 +395,9 @@ public class ChunkMetadata implements IChunkMetadata {
   public boolean hasNullValue(int measurementIndex) {
     return false;
   }
+
+  // TODO: replaces fields in this class with MeasurementSchema
+  public MeasurementSchema toMeasurementSchema() {
+    return new MeasurementSchema(measurementUid, tsDataType, encoding, 
compressionType);
+  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/file/metadata/LogicalTableSchema.java 
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/LogicalTableSchema.java
new file mode 100644
index 00000000..e2ccb9fe
--- /dev/null
+++ 
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/LogicalTableSchema.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.file.metadata;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * TableSchema for devices with path-based DeviceIds. It generates the Id 
columns based on the max
+ * level of paths.
+ */
+public class LogicalTableSchema extends TableSchema {
+
+  private int maxLevel;
+
+  public LogicalTableSchema(String tableName) {
+    super(tableName);
+  }
+
+  @Override
+  public void update(ChunkGroupMetadata chunkGroupMetadata) {
+    super.update(chunkGroupMetadata);
+    this.maxLevel = Math.max(this.maxLevel, 
chunkGroupMetadata.getDevice().segmentNum());
+  }
+
+  private List<MeasurementSchema> generateIdColumns() {
+    List<MeasurementSchema> generatedIdColumns = new ArrayList<>();
+    for (int i = 0; i < maxLevel; i++) {
+      generatedIdColumns.add(
+          new MeasurementSchema(
+              "__level" + i, TSDataType.TEXT, TSEncoding.PLAIN, 
CompressionType.UNCOMPRESSED));
+    }
+    return generatedIdColumns;
+  }
+
+  /** Once called, the schema is no longer updatable. */
+  @Override
+  public List<MeasurementSchema> getColumnSchemas() {
+    if (!updatable) {
+      return columnSchemas;
+    }
+
+    List<MeasurementSchema> allColumns = new ArrayList<>(generateIdColumns());
+    allColumns.addAll(columnSchemas);
+    columnSchemas = allColumns;
+    updatable = false;
+    return allColumns;
+  }
+}
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java 
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
index 3ad644fe..0741fd55 100644
--- a/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
+++ b/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
@@ -21,9 +21,67 @@ package org.apache.tsfile.file.metadata;
 
 import org.apache.tsfile.write.schema.MeasurementSchema;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 public class TableSchema {
   protected String tableName;
   protected List<MeasurementSchema> columnSchemas;
+  protected boolean updatable = false;
+
+  // columnName -> pos in columnSchemas;
+  private Map<String, Integer> columnPosIndex;
+
+  public TableSchema(String tableName) {
+    this.tableName = tableName;
+    this.columnSchemas = new ArrayList<>();
+    this.updatable = true;
+  }
+
+  public TableSchema(String tableName, List<MeasurementSchema> columnSchemas) {
+    this.tableName = tableName;
+    this.columnSchemas = columnSchemas;
+  }
+
+  public Map<String, Integer> getColumnPosIndex() {
+    if (columnPosIndex == null) {
+      columnPosIndex = new HashMap<>();
+    }
+    return columnPosIndex;
+  }
+
+  public int findColumnIndex(String columnName) {
+    return getColumnPosIndex()
+        .computeIfAbsent(
+            columnName,
+            colName -> {
+              for (int i = 0; i < columnSchemas.size(); i++) {
+                if 
(columnSchemas.get(i).getMeasurementId().equals(columnName)) {
+                  return i;
+                }
+              }
+              return -1;
+            });
+  }
+
+  public void update(ChunkGroupMetadata chunkGroupMetadata) {
+    if (!updatable) {
+      return;
+    }
+
+    for (ChunkMetadata chunkMetadata : 
chunkGroupMetadata.getChunkMetadataList()) {
+      int columnIndex = findColumnIndex(chunkMetadata.getMeasurementUid());
+      // if the measurement is not found in the column list, add it
+      if (columnIndex == -1) {
+        columnSchemas.add(chunkMetadata.toMeasurementSchema());
+        getColumnPosIndex().put(chunkMetadata.getMeasurementUid(), 
columnSchemas.size() - 1);
+      }
+    }
+  }
+
+  public List<MeasurementSchema> getColumnSchemas() {
+    return columnSchemas;
+  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java 
b/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
index 23b3d5ac..08453387 100644
--- a/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
@@ -193,8 +193,10 @@ public class TsFileIOWriter implements AutoCloseable {
     if (currentChunkGroupDeviceId == null || chunkMetadataList.isEmpty()) {
       return;
     }
-    chunkGroupMetadataList.add(
-        new ChunkGroupMetadata(currentChunkGroupDeviceId, chunkMetadataList));
+
+    ChunkGroupMetadata chunkGroupMetadata =
+        new ChunkGroupMetadata(currentChunkGroupDeviceId, chunkMetadataList);
+    chunkGroupMetadataList.add(chunkGroupMetadata);
     currentChunkGroupDeviceId = null;
     chunkMetadataList = null;
     out.flush();
@@ -233,7 +235,13 @@ public class TsFileIOWriter implements AutoCloseable {
       throws IOException {
 
     currentChunkMetadata =
-        new ChunkMetadata(measurementId, tsDataType, out.getPosition(), 
statistics);
+        new ChunkMetadata(
+            measurementId,
+            tsDataType,
+            encodingType,
+            compressionCodecName,
+            out.getPosition(),
+            statistics);
     currentChunkMetadata.setMask((byte) mask);
 
     ChunkHeader header =
@@ -255,6 +263,8 @@ public class TsFileIOWriter implements AutoCloseable {
         new ChunkMetadata(
             chunkHeader.getMeasurementID(),
             chunkHeader.getDataType(),
+            chunkHeader.getEncodingType(),
+            chunkHeader.getCompressionType(),
             out.getPosition(),
             chunkMetadata.getStatistics());
     chunkHeader.serializeTo(out.wrapAsStream());
@@ -277,7 +287,13 @@ public class TsFileIOWriter implements AutoCloseable {
       Statistics<? extends Serializable> statistics)
       throws IOException {
     currentChunkMetadata =
-        new ChunkMetadata(measurementId, tsDataType, out.getPosition(), 
statistics);
+        new ChunkMetadata(
+            measurementId,
+            tsDataType,
+            encodingType,
+            compressionType,
+            out.getPosition(),
+            statistics);
     currentChunkMetadata.setMask(TsFileConstant.VALUE_COLUMN_MASK);
     ChunkHeader emptyChunkHeader =
         new ChunkHeader(
@@ -298,6 +314,8 @@ public class TsFileIOWriter implements AutoCloseable {
         new ChunkMetadata(
             chunkHeader.getMeasurementID(),
             chunkHeader.getDataType(),
+            chunkHeader.getEncodingType(),
+            chunkHeader.getCompressionType(),
             out.getPosition(),
             chunk.getChunkStatistic());
     chunkHeader.serializeTo(out.wrapAsStream());

Reply via email to