This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch tsFile_v4
in repository https://gitbox.apache.org/repos/asf/tsfile.git

commit b612ec0686d5c84c958e309f0bceef51b767d773
Author: jt2594838 <[email protected]>
AuthorDate: Sun Apr 7 11:58:28 2024 +0800

    add TableSchema
    push Schema in TsFileWriter down to TsFileIOWriter
---
 .../org/apache/tsfile/file/metadata/IDeviceID.java |  2 +
 .../apache/tsfile/file/metadata/TableSchema.java   | 29 +++++++++++++
 .../tsfile/file/metadata/TsFileMetadata.java       |  3 ++
 .../java/org/apache/tsfile/write/TsFileWriter.java | 48 ++++++++++++----------
 .../org/apache/tsfile/write/schema/Schema.java     | 13 +++++-
 .../apache/tsfile/write/writer/TsFileIOWriter.java | 12 ++++++
 6 files changed, 84 insertions(+), 23 deletions(-)

diff --git 
a/tsfile/src/main/java/org/apache/tsfile/file/metadata/IDeviceID.java 
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/IDeviceID.java
index 04d73c19..9008cd8c 100644
--- a/tsfile/src/main/java/org/apache/tsfile/file/metadata/IDeviceID.java
+++ b/tsfile/src/main/java/org/apache/tsfile/file/metadata/IDeviceID.java
@@ -38,6 +38,8 @@ public interface IDeviceID extends Comparable<IDeviceID>, 
Accountable {
 
   boolean isEmpty();
 
+  String getTableName();
+
   static IDeviceID deserializeFrom(ByteBuffer byteBuffer) {
     return new PlainDeviceID(ReadWriteIOUtils.readVarIntString(byteBuffer));
   }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java 
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
new file mode 100644
index 00000000..3ad644fe
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.file.metadata;
+
+import org.apache.tsfile.write.schema.MeasurementSchema;
+
+import java.util.List;
+
+public class TableSchema {
+  protected String tableName;
+  protected List<MeasurementSchema> columnSchemas;
+}
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/file/metadata/TsFileMetadata.java 
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/TsFileMetadata.java
index 68bd6dd6..726a1148 100644
--- a/tsfile/src/main/java/org/apache/tsfile/file/metadata/TsFileMetadata.java
+++ b/tsfile/src/main/java/org/apache/tsfile/file/metadata/TsFileMetadata.java
@@ -26,6 +26,7 @@ import org.apache.tsfile.utils.ReadWriteIOUtils;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 /** TSFileMetaData collects all metadata info and saves in its data structure. 
*/
 public class TsFileMetadata {
@@ -35,6 +36,8 @@ public class TsFileMetadata {
 
   // List of <name, offset, childMetadataIndexType>
   private MetadataIndexNode metadataIndex;
+  private Map<String, MetadataIndexNode> tableMetadataIndexNodeMap;
+  private Map<String, TableSchema> tableSchemaMap;
 
   // offset of MetaMarker.SEPARATOR
   private long metaOffset;
diff --git a/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java 
b/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
index cdbb8b32..8749f588 100644
--- a/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
@@ -61,8 +61,6 @@ public class TsFileWriter implements AutoCloseable {
 
   protected static final TSFileConfig config = 
TSFileDescriptor.getInstance().getConfig();
   private static final Logger LOG = 
LoggerFactory.getLogger(TsFileWriter.class);
-  /** schema of this TsFile. */
-  protected final Schema schema;
   /** IO writer of this TsFile. */
   private final TsFileIOWriter fileWriter;
 
@@ -188,9 +186,9 @@ public class TsFileWriter implements AutoCloseable {
           measurementGroupMap.put(new Path(entry.getKey().getDevice()), group);
         }
       }
-      this.schema = new Schema(measurementGroupMap);
+      getSchema().setRegisteredTimeseries(measurementGroupMap);
     } else {
-      this.schema = schema;
+      fileWriter.setSchema(schema);
     }
     this.pageSize = conf.getPageSizeInByte();
     this.chunkGroupSizeThreshold = conf.getGroupSizeInByte();
@@ -206,7 +204,7 @@ public class TsFileWriter implements AutoCloseable {
 
   public void registerSchemaTemplate(
       String templateName, Map<String, MeasurementSchema> template, boolean 
isAligned) {
-    schema.registerSchemaTemplate(templateName, new 
MeasurementGroup(isAligned, template));
+    getSchema().registerSchemaTemplate(templateName, new 
MeasurementGroup(isAligned, template));
   }
 
   /**
@@ -218,16 +216,16 @@ public class TsFileWriter implements AutoCloseable {
    * @throws WriteProcessException
    */
   public void registerDevice(String deviceId, String templateName) throws 
WriteProcessException {
-    if (!schema.getSchemaTemplates().containsKey(templateName)) {
+    if (!getSchema().getSchemaTemplates().containsKey(templateName)) {
       throw new WriteProcessException("given template is not existed! " + 
templateName);
     }
-    if (schema.getRegisteredTimeseriesMap().containsKey(new Path(deviceId))) {
+    if (getSchema().getRegisteredTimeseriesMap().containsKey(new 
Path(deviceId))) {
       throw new WriteProcessException(
           "this device "
               + deviceId
               + " has been registered, you can only use registerDevice method 
to register empty device.");
     }
-    schema.registerDevice(deviceId, templateName);
+    getSchema().registerDevice(deviceId, templateName);
   }
 
   /**
@@ -240,8 +238,8 @@ public class TsFileWriter implements AutoCloseable {
   public void registerTimeseries(Path devicePath, MeasurementSchema 
measurementSchema)
       throws WriteProcessException {
     MeasurementGroup measurementGroup;
-    if (schema.containsDevice(devicePath)) {
-      measurementGroup = schema.getSeriesSchema(devicePath);
+    if (getSchema().containsDevice(devicePath)) {
+      measurementGroup = getSchema().getSeriesSchema(devicePath);
       if (measurementGroup.isAligned()) {
         throw new WriteProcessException(
             "given device " + devicePath + " has been registered for aligned 
timeseries.");
@@ -259,7 +257,7 @@ public class TsFileWriter implements AutoCloseable {
     measurementGroup
         .getMeasurementSchemaMap()
         .put(measurementSchema.getMeasurementId(), measurementSchema);
-    schema.registerMeasurementGroup(devicePath, measurementGroup);
+    getSchema().registerMeasurementGroup(devicePath, measurementGroup);
   }
 
   /**
@@ -288,8 +286,8 @@ public class TsFileWriter implements AutoCloseable {
    */
   public void registerAlignedTimeseries(Path devicePath, 
List<MeasurementSchema> measurementSchemas)
       throws WriteProcessException {
-    if (schema.containsDevice(devicePath)) {
-      if (schema.getSeriesSchema(devicePath).isAligned()) {
+    if (getSchema().containsDevice(devicePath)) {
+      if (getSchema().getSeriesSchema(devicePath).isAligned()) {
         throw new WriteProcessException(
             "given device "
                 + devicePath
@@ -306,7 +304,7 @@ public class TsFileWriter implements AutoCloseable {
               .getMeasurementSchemaMap()
               .put(measurementSchema.getMeasurementId(), measurementSchema);
         });
-    schema.registerMeasurementGroup(devicePath, measurementGroup);
+    getSchema().registerMeasurementGroup(devicePath, measurementGroup);
   }
 
   private boolean checkIsTimeseriesExist(TSRecord record, boolean isAligned)
@@ -318,10 +316,10 @@ public class TsFileWriter implements AutoCloseable {
     // initial all SeriesWriters of measurements in this TSRecord
     Path devicePath = new Path(record.deviceId);
     List<MeasurementSchema> measurementSchemas;
-    if (schema.containsDevice(devicePath)) {
+    if (getSchema().containsDevice(devicePath)) {
       measurementSchemas =
           checkIsAllMeasurementsInGroup(
-              record.dataPointList, schema.getSeriesSchema(devicePath), 
isAligned);
+              record.dataPointList, getSchema().getSeriesSchema(devicePath), 
isAligned);
       if (isAligned) {
         for (IMeasurementSchema s : measurementSchemas) {
           if (flushedMeasurementsInDeviceMap.containsKey(
@@ -338,10 +336,11 @@ public class TsFileWriter implements AutoCloseable {
         }
       }
       groupWriter.tryToAddSeriesWriter(measurementSchemas);
-    } else if (schema.getSchemaTemplates() != null && 
schema.getSchemaTemplates().size() == 1) {
+    } else if (getSchema().getSchemaTemplates() != null
+        && getSchema().getSchemaTemplates().size() == 1) {
       // use the default template without needing to register device
       MeasurementGroup measurementGroup =
-          schema.getSchemaTemplates().entrySet().iterator().next().getValue();
+          
getSchema().getSchemaTemplates().entrySet().iterator().next().getValue();
       measurementSchemas =
           checkIsAllMeasurementsInGroup(record.dataPointList, 
measurementGroup, isAligned);
       groupWriter.tryToAddSeriesWriter(measurementSchemas);
@@ -358,8 +357,8 @@ public class TsFileWriter implements AutoCloseable {
 
     Path devicePath = new Path(tablet.deviceId);
     List<MeasurementSchema> schemas = tablet.getSchemas();
-    if (schema.containsDevice(devicePath)) {
-      checkIsAllMeasurementsInGroup(schema.getSeriesSchema(devicePath), 
schemas, isAligned);
+    if (getSchema().containsDevice(devicePath)) {
+      checkIsAllMeasurementsInGroup(getSchema().getSeriesSchema(devicePath), 
schemas, isAligned);
       if (isAligned) {
         for (IMeasurementSchema s : schemas) {
           if (flushedMeasurementsInDeviceMap.containsKey(
@@ -376,9 +375,10 @@ public class TsFileWriter implements AutoCloseable {
         }
       }
       groupWriter.tryToAddSeriesWriter(schemas);
-    } else if (schema.getSchemaTemplates() != null && 
schema.getSchemaTemplates().size() == 1) {
+    } else if (getSchema().getSchemaTemplates() != null
+        && getSchema().getSchemaTemplates().size() == 1) {
       MeasurementGroup measurementGroup =
-          schema.getSchemaTemplates().entrySet().iterator().next().getValue();
+          
getSchema().getSchemaTemplates().entrySet().iterator().next().getValue();
       checkIsAllMeasurementsInGroup(measurementGroup, schemas, isAligned);
       groupWriter.tryToAddSeriesWriter(schemas);
     } else {
@@ -648,4 +648,8 @@ public class TsFileWriter implements AutoCloseable {
   public TsFileIOWriter getIOWriter() {
     return this.fileWriter;
   }
+
+  public Schema getSchema() {
+    return fileWriter.getSchema();
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/tsfile/write/schema/Schema.java 
b/tsfile/src/main/java/org/apache/tsfile/write/schema/Schema.java
index 54ab6663..a82ab71d 100644
--- a/tsfile/src/main/java/org/apache/tsfile/write/schema/Schema.java
+++ b/tsfile/src/main/java/org/apache/tsfile/write/schema/Schema.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tsfile.write.schema;
 
+import org.apache.tsfile.file.metadata.TableSchema;
 import org.apache.tsfile.read.common.Path;
 import org.apache.tsfile.utils.MeasurementGroup;
 
@@ -36,11 +37,13 @@ public class Schema implements Serializable {
    * Path (devicePath) -> measurementSchema By default, use the LinkedHashMap 
to store the order of
    * insertion
    */
-  private final Map<Path, MeasurementGroup> registeredTimeseries;
+  private Map<Path, MeasurementGroup> registeredTimeseries;
 
   /** template name -> (measurement -> MeasurementSchema) */
   private Map<String, MeasurementGroup> schemaTemplates;
 
+  private Map<String, TableSchema> tableSchemaMap = new HashMap<>();
+
   public Schema() {
     this.registeredTimeseries = new LinkedHashMap<>();
   }
@@ -68,6 +71,10 @@ public class Schema implements Serializable {
     this.schemaTemplates.put(templateName, measurementGroup);
   }
 
+  public void registerTableSchema(String tableName, TableSchema tableSchema) {
+    tableSchemaMap.put(tableName, tableSchema);
+  }
+
   /** If template does not exist, an nonAligned timeseries is created by 
default */
   public void extendTemplate(String templateName, MeasurementSchema 
descriptor) {
     if (schemaTemplates == null) {
@@ -103,6 +110,10 @@ public class Schema implements Serializable {
     return registeredTimeseries.containsKey(devicePath);
   }
 
+  public void setRegisteredTimeseries(Map<Path, MeasurementGroup> 
registeredTimeseries) {
+    this.registeredTimeseries = registeredTimeseries;
+  }
+
   // for test
   public Map<Path, MeasurementGroup> getRegisteredTimeseriesMap() {
     return registeredTimeseries;
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java 
b/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
index ef9515c5..23b3d5ac 100644
--- a/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
@@ -45,6 +45,7 @@ import org.apache.tsfile.utils.BytesUtils;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.PublicBAOS;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.tsfile.write.schema.Schema;
 import org.apache.tsfile.write.writer.tsmiterator.TSMIterator;
 
 import org.apache.commons.io.FileUtils;
@@ -84,6 +85,9 @@ public class TsFileIOWriter implements AutoCloseable {
     VERSION_NUMBER_BYTE = TSFileConfig.VERSION_NUMBER;
   }
 
+  /** schema of this TsFile. */
+  protected Schema schema = new Schema();
+
   protected TsFileOutput out;
   protected boolean canWrite = true;
   protected File file;
@@ -691,4 +695,12 @@ public class TsFileIOWriter implements AutoCloseable {
   public TsFileOutput getTsFileOutput() {
     return this.out;
   }
+
+  public Schema getSchema() {
+    return schema;
+  }
+
+  public void setSchema(Schema schema) {
+    this.schema = schema;
+  }
 }

Reply via email to