This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch fast_write_test_0423
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/fast_write_test_0423 by this 
push:
     new 2501b0ba9c add schema fill
2501b0ba9c is described below

commit 2501b0ba9c3dc68f8e388431f3ef4efe9e948b31
Author: MarcosZyk <[email protected]>
AuthorDate: Mon Apr 24 10:55:31 2023 +0800

    add schema fill
---
 .../iotdb/db/metadata/template/Template.java       | 22 ++++++
 .../plan/analyze/schema/ClusterSchemaFetcher.java  | 79 ++++++++++++++++++++++
 .../plan/analyze/schema/ISchemaComputation.java    |  2 +
 .../db/mpp/plan/analyze/schema/ISchemaFetcher.java |  6 ++
 .../mpp/plan/analyze/schema/SchemaValidator.java   | 18 ++++-
 .../planner/plan/node/write/FastInsertRowNode.java | 25 +++++++
 6 files changed, 149 insertions(+), 3 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java 
b/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java
index 9cf66897e0..3810d68b61 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java
@@ -34,6 +34,8 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -45,6 +47,8 @@ public class Template implements Serializable {
   private boolean isDirectAligned;
   private Map<String, IMeasurementSchema> schemaMap;
 
+  private List<String> measurementsInorder;
+
   private transient int rehashCode;
 
   public Template() {
@@ -72,11 +76,13 @@ public class Template implements Serializable {
     this.isDirectAligned = isAligned;
     this.schemaMap = new ConcurrentHashMap<>();
     this.name = name;
+    this.measurementsInorder = Collections.synchronizedList(new ArrayList<>());
     for (int i = 0; i < measurements.size(); i++) {
       IMeasurementSchema schema =
           new MeasurementSchema(
               measurements.get(i), dataTypes.get(i), encodings.get(i), 
compressors.get(i));
       schemaMap.put(schema.getMeasurementId(), schema);
+      measurementsInorder.add(measurements.get(i));
     }
   }
 
@@ -144,6 +150,7 @@ public class Template implements Serializable {
       IMeasurementSchema schema =
           constructSchema(measurements[i], dataTypes[i], encodings[i], 
compressors[i]);
       schemaMap.put(measurements[i], schema);
+      measurementsInorder.add(measurements[i]);
     }
   }
 
@@ -153,6 +160,11 @@ public class Template implements Serializable {
       TSEncoding encoding,
       CompressionType compressionType) {
     schemaMap.put(measurement, constructSchema(measurement, dataType, 
encoding, compressionType));
+    measurementsInorder.add(measurement);
+  }
+
+  public List<String> getMeasurementsInorder() {
+    return measurementsInorder;
   }
 
   // endregion
@@ -166,6 +178,9 @@ public class Template implements Serializable {
       ReadWriteIOUtils.write(entry.getKey(), buffer);
       entry.getValue().partialSerializeTo(buffer);
     }
+    for (String measurement : measurementsInorder) {
+      ReadWriteIOUtils.write(measurement, buffer);
+    }
   }
 
   public void serialize(OutputStream outputStream) throws IOException {
@@ -177,6 +192,9 @@ public class Template implements Serializable {
       ReadWriteIOUtils.write(entry.getKey(), outputStream);
       entry.getValue().partialSerializeTo(outputStream);
     }
+    for (String measurement : measurementsInorder) {
+      ReadWriteIOUtils.write(measurement, outputStream);
+    }
   }
 
   public ByteBuffer serialize() {
@@ -206,6 +224,10 @@ public class Template implements Serializable {
       }
       schemaMap.put(schemaName, measurementSchema);
     }
+    measurementsInorder = Collections.synchronizedList(new ArrayList<>());
+    for (int i = 0; i < schemaSize; i++) {
+      measurementsInorder.add(ReadWriteIOUtils.readString(buffer));
+    }
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
index 18a860f2d4..57747c1829 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
 import org.apache.iotdb.db.metadata.cache.DataNodeTemplateSchemaCache;
 import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
@@ -361,6 +362,84 @@ public class ClusterSchemaFetcher implements 
ISchemaFetcher {
     }
   }
 
+  @Override
+  public void fetchAndComputeSchemaWithAutoCreateForFastWrite(
+      List<? extends ISchemaComputationWithAutoCreation> 
schemaComputationWithAutoCreationList) {
+    // The schema cache R/W and fetch operation must be locked together thus 
the cache clean
+    // operation executed by delete timeseries will be effective.
+    schemaCache.takeReadLock();
+    templateSchemaCache.takeReadLock();
+    try {
+      List<ISchemaComputationWithAutoCreation> templateTimeSeriesRequestList = 
new ArrayList<>();
+      List<Pair<Template, PartialPath>> templateSetInfoList = new 
ArrayList<>();
+      Pair<Template, PartialPath> templateSetInfo;
+      for (ISchemaComputationWithAutoCreation 
schemaComputationWithAutoCreation :
+          schemaComputationWithAutoCreationList) {
+        templateSetInfo =
+            
templateManager.checkTemplateSetInfo(schemaComputationWithAutoCreation.getDevicePath());
+        if (templateSetInfo == null) {
+          throw new SemanticException(
+              "There's no template on prefix path of"
+                  + schemaComputationWithAutoCreation.getDevicePath());
+        }
+        templateTimeSeriesRequestList.add(schemaComputationWithAutoCreation);
+        templateSetInfoList.add(templateSetInfo);
+
+        schemaComputationWithAutoCreation.initMeasurementSchemaContainer(
+            templateSetInfo.left.getMeasurementNumber(),
+            templateSetInfo.getLeft().getMeasurementsInorder().toArray(new 
String[0]));
+      }
+
+      if (!templateTimeSeriesRequestList.isEmpty()) {
+        templateSchemaFetcher.processTemplateTimeSeries(
+            templateSetInfoList, templateTimeSeriesRequestList);
+      }
+    } finally {
+      schemaCache.releaseReadLock();
+      templateSchemaCache.releaseReadLock();
+    }
+  }
+
+  @Override
+  public void fetchAndComputeSchemaWithAutoCreateForFastWrite(
+      ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation) {
+    schemaCache.takeReadLock();
+    templateSchemaCache.takeReadLock();
+    try {
+
+      Pair<Template, PartialPath> templateSetInfo =
+          
templateManager.checkTemplateSetInfo(schemaComputationWithAutoCreation.getDevicePath());
+      List<Integer> indexOfMissingMeasurements;
+      if (templateSetInfo == null) {
+        throw new SemanticException(
+            "There's no template on prefix path of"
+                + schemaComputationWithAutoCreation.getDevicePath());
+      }
+
+      schemaComputationWithAutoCreation.initMeasurementSchemaContainer(
+          templateSetInfo.left.getMeasurementNumber(),
+          templateSetInfo.getLeft().getMeasurementsInorder().toArray(new 
String[0]));
+
+      // template timeseries
+      indexOfMissingMeasurements =
+          templateSchemaFetcher.processTemplateTimeSeries(
+              templateSetInfo, schemaComputationWithAutoCreation);
+
+      // all schema has been taken and processed
+      if (indexOfMissingMeasurements.isEmpty()) {
+        return;
+      }
+
+      // offer null for the rest missing schema processing
+      for (int index : indexOfMissingMeasurements) {
+        schemaComputationWithAutoCreation.computeMeasurement(index, null);
+      }
+    } finally {
+      schemaCache.releaseReadLock();
+      templateSchemaCache.releaseReadLock();
+    }
+  }
+
   @Override
   public Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath 
devicePath) {
     return templateManager.checkTemplateSetInfo(devicePath);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaComputation.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaComputation.java
index 8fdc9a9fcc..c57f35dc00 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaComputation.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaComputation.java
@@ -40,4 +40,6 @@ public interface ISchemaComputation {
    * @param measurementSchemaInfo the measurement schema of fetched measurement
    */
   void computeMeasurement(int index, IMeasurementSchemaInfo 
measurementSchemaInfo);
+
+  default void initMeasurementSchemaContainer(int size, String[] measurements) 
{}
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
index 450c4dc3b0..3a76e06e2d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
@@ -76,6 +76,12 @@ public interface ISchemaFetcher {
   void fetchAndComputeSchemaWithAutoCreate(
       List<? extends ISchemaComputationWithAutoCreation> 
schemaComputationWithAutoCreationList);
 
+  default void fetchAndComputeSchemaWithAutoCreateForFastWrite(
+      ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation) {}
+
+  default void fetchAndComputeSchemaWithAutoCreateForFastWrite(
+      List<? extends ISchemaComputationWithAutoCreation> 
schemaComputationWithAutoCreationList) {}
+
   ISchemaTree fetchSchemaListWithAutoCreate(
       List<PartialPath> devicePath,
       List<String[]> measurements,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
index 6cbd587a00..2d6b10ec94 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
@@ -24,6 +24,8 @@ import 
org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.BatchInsertNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.FastInsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.FastInsertRowsNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -38,10 +40,20 @@ public class SchemaValidator {
   public static void validate(InsertNode insertNode) {
     try {
       if (insertNode instanceof BatchInsertNode) {
-        SCHEMA_FETCHER.fetchAndComputeSchemaWithAutoCreate(
-            ((BatchInsertNode) insertNode).getSchemaValidationList());
+        if (insertNode instanceof FastInsertRowsNode) {
+          SCHEMA_FETCHER.fetchAndComputeSchemaWithAutoCreateForFastWrite(
+              ((BatchInsertNode) insertNode).getSchemaValidationList());
+        } else {
+          SCHEMA_FETCHER.fetchAndComputeSchemaWithAutoCreate(
+              ((BatchInsertNode) insertNode).getSchemaValidationList());
+        }
       } else {
-        
SCHEMA_FETCHER.fetchAndComputeSchemaWithAutoCreate(insertNode.getSchemaValidation());
+        if (insertNode instanceof FastInsertRowNode) {
+          SCHEMA_FETCHER.fetchAndComputeSchemaWithAutoCreateForFastWrite(
+              insertNode.getSchemaValidation());
+        } else {
+          
SCHEMA_FETCHER.fetchAndComputeSchemaWithAutoCreate(insertNode.getSchemaValidation());
+        }
       }
       insertNode.updateAfterSchemaValidation();
     } catch (QueryProcessException e) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
index 68bddffcac..80ea5de6a4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
@@ -21,10 +21,14 @@ package 
org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.mpp.common.schematree.IMeasurementSchemaInfo;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -106,4 +110,25 @@ public class FastInsertRowNode extends InsertRowNode {
     byte[] bytes = ReadWriteIOUtils.readBytes(byteBuffer, length);
     this.rawValues = ByteBuffer.wrap(bytes);
   }
+
+  @Override
+  public void initMeasurementSchemaContainer(int size, String[] measurements) {
+    this.measurementSchemas = new MeasurementSchema[size];
+    this.measurements = measurements;
+    this.dataTypes = new TSDataType[size];
+  }
+
+  @Override
+  public void validateDeviceSchema(boolean isAligned) {
+    this.isAligned = isAligned;
+  }
+
+  @Override
+  public void updateAfterSchemaValidation() throws QueryProcessException {}
+
+  @Override
+  public void validateMeasurementSchema(int index, IMeasurementSchemaInfo 
measurementSchemaInfo) {
+    measurementSchemas[index] = measurementSchemaInfo.getSchema();
+    this.dataTypes[index] = measurementSchemaInfo.getSchema().getType();
+  }
 }

Reply via email to