This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch new_java_interfaces
in repository https://gitbox.apache.org/repos/asf/tsfile.git

commit 5839f52788a7e14d3d048a2e199467bd60fbe7b1
Author: Tian Jiang <[email protected]>
AuthorDate: Wed Apr 23 16:04:46 2025 +0800

    add table model insert record
---
 .../main/java/org/apache/tsfile/utils/Binary.java  |  3 ++
 .../apache/tsfile/file/metadata/TableSchema.java   |  9 +++++
 .../write/chunk/AlignedChunkGroupWriterImpl.java   |  3 ++
 .../org/apache/tsfile/write/record/TSRecord.java   | 39 ++++++++++++++++++++--
 .../tsfile/write/record/datapoint/DataPoint.java   | 11 ++++++
 .../tsfile/write/v4/DeviceTableModelWriter.java    | 38 +++++++++++++++++++++
 .../org/apache/tsfile/write/v4/ITsFileWriter.java  |  4 +++
 .../apache/tsfile/write/TsFileWriteApiTest.java    | 31 +++++++++++++++++
 8 files changed, 135 insertions(+), 3 deletions(-)

diff --git a/java/common/src/main/java/org/apache/tsfile/utils/Binary.java 
b/java/common/src/main/java/org/apache/tsfile/utils/Binary.java
index f65d19c3..07337174 100644
--- a/java/common/src/main/java/org/apache/tsfile/utils/Binary.java
+++ b/java/common/src/main/java/org/apache/tsfile/utils/Binary.java
@@ -109,6 +109,9 @@ public class Binary implements Comparable<Binary>, 
Serializable, Accountable {
 
   @Override
   public String toString() {
+    if (values == null) {
+      return null;
+    }
     // use UTF_8 by default since toString do not provide parameter
     return getStringValue(StandardCharsets.UTF_8);
   }
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java 
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
index 81964473..70ddc690 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
@@ -53,6 +53,7 @@ public class TableSchema {
   private Map<String, Integer> columnPosIndex;
   // columnName -> pos in all id columns
   private Map<String, Integer> idColumnOrder;
+  private int tagColumnCnt = -1;
 
   public TableSchema(String tableName) {
     this.tableName = tableName.toLowerCase();
@@ -331,4 +332,12 @@ public class TableSchema {
   public int hashCode() {
     return Objects.hash(tableName, measurementSchemas, columnCategories);
   }
+
+  public int getTagColumnCnt() {
+    if (tagColumnCnt != -1) {
+      return tagColumnCnt;
+    }
+    tagColumnCnt = (int) columnCategories.stream().filter(c -> c == 
ColumnCategory.TAG).count();
+    return tagColumnCnt;
+  }
 }
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
index a47df975..77858786 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
@@ -165,6 +165,9 @@ public class AlignedChunkGroupWriterImpl implements 
IChunkGroupWriter {
               ? point.getMeasurementId().toLowerCase()
               : point.getMeasurementId();
       ValueChunkWriter valueChunkWriter = 
valueChunkWriterMap.get(measurementId);
+      if (valueChunkWriter == null) {
+        valueChunkWriter = 
tryToAddSeriesWriterInternal(point.getMeasurementSchema());
+      }
       switch (point.getType()) {
         case BOOLEAN:
           valueChunkWriter.write(time, (boolean) point.getValue(), isNull);
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/record/TSRecord.java 
b/java/tsfile/src/main/java/org/apache/tsfile/write/record/TSRecord.java
index b7c24831..ed778d56 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/write/record/TSRecord.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/write/record/TSRecord.java
@@ -18,10 +18,15 @@
  */
 package org.apache.tsfile.write.record;
 
+import org.apache.tsfile.annotations.TableModel;
 import org.apache.tsfile.annotations.TsFileApi;
 import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.ConflictDataTypeException;
+import org.apache.tsfile.exception.write.WriteProcessException;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.IDeviceID.Factory;
+import org.apache.tsfile.file.metadata.TableSchema;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.StringContainer;
 import org.apache.tsfile.write.record.datapoint.BooleanDataPoint;
@@ -49,19 +54,22 @@ public class TSRecord {
   /** deviceId of this TSRecord. */
   public IDeviceID deviceId;
 
+  private String tableName;
+
   /** all value of this TSRecord. */
   public List<DataPoint> dataPointList = new ArrayList<>();
 
   /**
    * constructor of TSRecord.
    *
-   * @param deviceId deviceId of this TSRecord
+   * @param deviceIdOrTableName deviceId of this TSRecord
    * @param timestamp timestamp of this TSRecord
    */
   @TsFileApi
-  public TSRecord(String deviceId, long timestamp) {
+  public TSRecord(String deviceIdOrTableName, long timestamp) {
     this.time = timestamp;
-    this.deviceId = Factory.DEFAULT_FACTORY.create(deviceId);
+    this.deviceId = Factory.DEFAULT_FACTORY.create(deviceIdOrTableName);
+    this.tableName = deviceIdOrTableName;
   }
 
   @TsFileApi
@@ -142,4 +150,29 @@ public class TSRecord {
     sc.addTail("]}");
     return sc.toString();
   }
+
+  @TableModel
+  public IDeviceID getDeviceId(TableSchema schema) throws 
WriteProcessException {
+    int tagCnt = schema.getTagColumnCnt();
+    String[] idSegments = new String[tagCnt + 1];
+    idSegments[0] = schema.getTableName();
+
+    for (DataPoint dataPoint : dataPointList) {
+      String columnName = dataPoint.getMeasurementId();
+      int idColumnOrder = schema.findIdColumnOrder(columnName);
+      if (idColumnOrder != -1) {
+        if (!(dataPoint instanceof StringDataPoint)) {
+          throw new ConflictDataTypeException(dataPoint.getType(), 
TSDataType.STRING);
+        }
+        Object value = dataPoint.getValue();
+        idSegments[idColumnOrder + 1] = value != null ? value.toString() : 
null;
+      }
+    }
+
+    return Factory.DEFAULT_FACTORY.create(idSegments);
+  }
+
+  public String getTableName() {
+    return tableName.toLowerCase();
+  }
 }
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/record/datapoint/DataPoint.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/write/record/datapoint/DataPoint.java
index 80e617ca..05352c5e 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/write/record/datapoint/DataPoint.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/record/datapoint/DataPoint.java
@@ -29,6 +29,7 @@ import org.apache.tsfile.write.chunk.ChunkWriterImpl;
 
 import java.io.IOException;
 import java.time.LocalDate;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
 
 /**
  * This is a abstract class representing a data point. DataPoint consists of a 
measurement id and a
@@ -43,6 +44,8 @@ public abstract class DataPoint {
   /** measurementId of this DataPoint. */
   protected final String measurementId;
 
+  protected IMeasurementSchema measurementSchema;
+
   /**
    * constructor of DataPoint.
    *
@@ -157,4 +160,12 @@ public abstract class DataPoint {
   public void setDate(LocalDate value) {
     throw new UnsupportedOperationException("set Date not support in 
DataPoint");
   }
+
+  public IMeasurementSchema getMeasurementSchema() {
+    return measurementSchema;
+  }
+
+  public void setMeasurementSchema(IMeasurementSchema measurementSchema) {
+    this.measurementSchema = measurementSchema;
+  }
 }
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/DeviceTableModelWriter.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/DeviceTableModelWriter.java
index 66fca2cf..1752d699 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/DeviceTableModelWriter.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/DeviceTableModelWriter.java
@@ -29,7 +29,9 @@ import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.TableSchema;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.WriteUtils;
+import org.apache.tsfile.write.record.TSRecord;
 import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.record.datapoint.DataPoint;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 
 import java.io.File;
@@ -74,6 +76,39 @@ public class DeviceTableModelWriter extends 
AbstractTableModelTsFileWriter {
     checkMemorySizeAndMayFlushChunks();
   }
 
+  @TsFileApi
+  @Override
+  public void write(TSRecord record) throws IOException, WriteProcessException 
{
+    String tableName = record.getTableName();
+    if (tableName == null) {
+      tableName = this.tableName;
+    }
+    if (tableName == null) {
+      throw new WriteProcessException("Table name is null");
+    }
+
+    final TableSchema tableSchema = 
getSchema().getTableSchemaMap().get(tableName);
+    if (tableSchema == null) {
+      throw new NoTableException(tableName);
+    }
+
+    IDeviceID deviceId = record.getDeviceId(tableSchema);
+    List<DataPoint> fieldDataPoints = new 
ArrayList<>(record.dataPointList.size());
+    for (DataPoint dataPoint : record.dataPointList) {
+      int columnIndex = 
tableSchema.findColumnIndex(dataPoint.getMeasurementId());
+      if (columnIndex < 0) {
+        throw new NoMeasurementException(dataPoint.getMeasurementId());
+      }
+      ColumnCategory columnCategory = 
tableSchema.getColumnTypes().get(columnIndex);
+      if (columnCategory == ColumnCategory.FIELD) {
+        fieldDataPoints.add(dataPoint);
+        
dataPoint.setMeasurementSchema(tableSchema.getColumnSchemas().get(columnIndex));
+      }
+    }
+    recordCount += tryToInitialGroupWriter(deviceId, isTableWriteAligned, 
true).write(record.time, fieldDataPoints);
+    checkMemorySizeAndMayFlushChunks();
+  }
+
   private void checkIsTableExistAndSetColumnCategoryList(Tablet tablet)
       throws WriteProcessException {
     String tabletTableName = tablet.getTableName();
@@ -82,6 +117,9 @@ public class DeviceTableModelWriter extends 
AbstractTableModelTsFileWriter {
     }
     tablet.setTableName(this.tableName);
     final TableSchema tableSchema = 
getSchema().getTableSchemaMap().get(tableName);
+    if (tableSchema == null) {
+      throw new NoTableException(tabletTableName);
+    }
 
     List<ColumnCategory> columnCategoryListForTablet = new 
ArrayList<>(tablet.getSchemas().size());
     for (IMeasurementSchema writingColumnSchema : tablet.getSchemas()) {
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/ITsFileWriter.java 
b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/ITsFileWriter.java
index 8610ee1f..25ccb031 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/ITsFileWriter.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/ITsFileWriter.java
@@ -21,6 +21,7 @@ package org.apache.tsfile.write.v4;
 
 import org.apache.tsfile.annotations.TsFileApi;
 import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.write.record.TSRecord;
 import org.apache.tsfile.write.record.Tablet;
 
 import java.io.IOException;
@@ -32,4 +33,7 @@ public interface ITsFileWriter extends AutoCloseable {
 
   @TsFileApi
   void close();
+
+  @TsFileApi
+  void write(TSRecord record) throws IOException, WriteProcessException;
 }
diff --git 
a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java 
b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
index 17a0d0f1..11f1b779 100644
--- a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
+++ b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
@@ -971,4 +971,35 @@ public class TsFileWriteApiTest {
       Assert.assertTrue(resultSet.isNull(3));
     }
   }
+
+  @Test
+  public void writeRecord()
+      throws IOException, WriteProcessException, ReadProcessException {
+    setEnv(100 * 1024 * 1024, 10 * 1024);
+
+    TableSchema tableSchema =
+        new TableSchema(
+            "Table1",
+            Arrays.asList(
+                new ColumnSchema("tag1", TSDataType.STRING, 
ColumnCategory.TAG),
+                new ColumnSchema("field1", TSDataType.BOOLEAN, 
ColumnCategory.FIELD)));
+    try (ITsFileWriter writer =
+        new TsFileWriterBuilder().file(f).tableSchema(tableSchema).build()) {
+      writer.write(new TSRecord("Table1", 0).addPoint("tag1", 
"d1").addPoint("field1", true));
+      writer.write(new TSRecord("Table1", 1).addPoint("tag1", 
"d2").addPoint("field1", false));
+    }
+    try (ITsFileReader reader = new TsFileReaderBuilder().file(f).build();
+        ResultSet resultSet =
+            reader.query(
+                "table1", Arrays.asList("tag1", "field1"), Long.MIN_VALUE, 
Long.MAX_VALUE)) {
+      Assert.assertTrue(resultSet.next());
+      Assert.assertEquals(0, resultSet.getLong(1));
+      Assert.assertEquals("d1", resultSet.getString(2));
+      Assert.assertTrue(resultSet.getBoolean(3));
+      Assert.assertTrue(resultSet.next());
+      Assert.assertEquals(1, resultSet.getLong(1));
+      Assert.assertEquals("d2", resultSet.getString(2));
+      Assert.assertFalse(resultSet.getBoolean(3));
+    }
+  }
 }

Reply via email to