This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/develop by this push:
     new d3eb61a4 Init all series writer for AlignedChunkGroupWriter
d3eb61a4 is described below

commit d3eb61a43ece95f52240903a2f8d4b8fd4edbf42
Author: shuwenwei <[email protected]>
AuthorDate: Fri Jul 18 11:12:12 2025 +0800

    Init all series writer for AlignedChunkGroupWriter
---
 .../java/org/apache/tsfile/write/TsFileWriter.java |  23 ++-
 .../write/v4/AbstractTableModelTsFileWriter.java   |   6 +-
 .../tsfile/write/v4/DeviceTableModelWriter.java    |   9 ++
 .../read/TsFileV4ReadWriteInterfacesTest.java      | 102 ++++++++++++
 .../apache/tsfile/write/TsFileWriteApiTest.java    | 179 +++++++++++++++++++++
 5 files changed, 317 insertions(+), 2 deletions(-)

diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java 
b/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
index d1a13e7c..ff37612e 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
@@ -233,6 +233,10 @@ public class TsFileWriter implements AutoCloseable {
     }
   }
 
+  public void setChunkGroupSizeThreshold(long chunkGroupSizeThreshold) {
+    this.chunkGroupSizeThreshold = chunkGroupSizeThreshold;
+  }
+
   public void registerSchemaTemplate(
       String templateName, Map<String, IMeasurementSchema> template, boolean 
isAligned) {
     getSchema().registerSchemaTemplate(templateName, new 
MeasurementGroup(isAligned, template));
@@ -501,7 +505,7 @@ public class TsFileWriter implements AutoCloseable {
   }
 
   private IChunkGroupWriter tryToInitialGroupWriter(
-      IDeviceID deviceId, boolean isAligned, boolean isTableModel) {
+      IDeviceID deviceId, boolean isAligned, boolean isTableModel) throws 
IOException {
     IChunkGroupWriter groupWriter = groupWriters.get(deviceId);
     if (groupWriter == null) {
       if (isAligned) {
@@ -509,6 +513,8 @@ public class TsFileWriter implements AutoCloseable {
             isTableModel
                 ? new TableChunkGroupWriterImpl(deviceId, encryptParam)
                 : new AlignedChunkGroupWriterImpl(deviceId, encryptParam);
+        initAllSeriesWriterForAlignedSeries(
+            (AlignedChunkGroupWriterImpl) groupWriter, deviceId, isTableModel);
         if (!isUnseq) { // Sequence File
           ((AlignedChunkGroupWriterImpl) groupWriter)
               .setLastTime(alignedDeviceLastTimeMap.get(deviceId));
@@ -526,6 +532,21 @@ public class TsFileWriter implements AutoCloseable {
     return groupWriter;
   }
 
+  private void initAllSeriesWriterForAlignedSeries(
+      AlignedChunkGroupWriterImpl alignedChunkGroupWriter, IDeviceID deviceID, 
boolean isTableModel)
+      throws IOException {
+    Schema schema = getSchema();
+    if (isTableModel) {
+      alignedChunkGroupWriter.tryToAddSeriesWriter(
+          
schema.getTableSchemaMap().get(deviceID.getTableName()).getColumnSchemas());
+    } else {
+      MeasurementGroup deviceSchema = schema.getSeriesSchema(deviceID);
+      for (IMeasurementSchema measurementSchema : 
deviceSchema.getMeasurementSchemaMap().values()) {
+        
alignedChunkGroupWriter.tryToAddSeriesWriterInternal(measurementSchema);
+      }
+    }
+  }
+
   /**
    * write a record in type of T.
    *
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/AbstractTableModelTsFileWriter.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/AbstractTableModelTsFileWriter.java
index 92f4c102..3120bb40 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/AbstractTableModelTsFileWriter.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/AbstractTableModelTsFileWriter.java
@@ -146,7 +146,7 @@ abstract class AbstractTableModelTsFileWriter implements 
ITsFileWriter {
   }
 
   protected IChunkGroupWriter tryToInitialGroupWriter(
-      IDeviceID deviceId, boolean isAligned, boolean isTableModel) {
+      IDeviceID deviceId, boolean isAligned, boolean isTableModel) throws 
IOException {
     IChunkGroupWriter groupWriter = groupWriters.get(deviceId);
     if (groupWriter == null) {
       if (isAligned) {
@@ -156,6 +156,7 @@ abstract class AbstractTableModelTsFileWriter implements 
ITsFileWriter {
                 : new AlignedChunkGroupWriterImpl(deviceId, encryptParam);
         ((AlignedChunkGroupWriterImpl) groupWriter)
             .setLastTime(alignedDeviceLastTimeMap.get(deviceId));
+        initAllSeriesWriterForAlignedSeries((AlignedChunkGroupWriterImpl) 
groupWriter);
       } else {
         groupWriter = new NonAlignedChunkGroupWriterImpl(deviceId, 
encryptParam);
         ((NonAlignedChunkGroupWriterImpl) groupWriter)
@@ -167,6 +168,9 @@ abstract class AbstractTableModelTsFileWriter implements 
ITsFileWriter {
     return groupWriter;
   }
 
+  protected abstract void initAllSeriesWriterForAlignedSeries(
+      AlignedChunkGroupWriterImpl alignedChunkGroupWriter) throws IOException;
+
   /**
    * calculate total memory size occupied by all ChunkGroupWriter instances 
currently.
    *
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/DeviceTableModelWriter.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/DeviceTableModelWriter.java
index 66fca2cf..f64f285f 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/DeviceTableModelWriter.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/DeviceTableModelWriter.java
@@ -29,6 +29,7 @@ import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.TableSchema;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.WriteUtils;
+import org.apache.tsfile.write.chunk.AlignedChunkGroupWriterImpl;
 import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 
@@ -40,6 +41,7 @@ import java.util.List;
 public class DeviceTableModelWriter extends AbstractTableModelTsFileWriter {
 
   private String tableName;
+  private TableSchema tableSchema;
   private boolean isTableWriteAligned = true;
 
   public DeviceTableModelWriter(File file, TableSchema tableSchema, long 
memoryThreshold)
@@ -74,6 +76,12 @@ public class DeviceTableModelWriter extends 
AbstractTableModelTsFileWriter {
     checkMemorySizeAndMayFlushChunks();
   }
 
+  @Override
+  protected void initAllSeriesWriterForAlignedSeries(
+      AlignedChunkGroupWriterImpl alignedChunkGroupWriter) throws IOException {
+    
alignedChunkGroupWriter.tryToAddSeriesWriter(tableSchema.getColumnSchemas());
+  }
+
   private void checkIsTableExistAndSetColumnCategoryList(Tablet tablet)
       throws WriteProcessException {
     String tabletTableName = tablet.getTableName();
@@ -102,6 +110,7 @@ public class DeviceTableModelWriter extends 
AbstractTableModelTsFileWriter {
 
   private void registerTableSchema(TableSchema tableSchema) {
     this.tableName = tableSchema.getTableName();
+    this.tableSchema = tableSchema;
     getSchema().registerTableSchema(tableSchema);
   }
 }
diff --git 
a/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileV4ReadWriteInterfacesTest.java
 
b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileV4ReadWriteInterfacesTest.java
index d9b7147f..51aa64ce 100644
--- 
a/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileV4ReadWriteInterfacesTest.java
+++ 
b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileV4ReadWriteInterfacesTest.java
@@ -21,10 +21,13 @@ package org.apache.tsfile.read;
 
 import org.apache.tsfile.enums.ColumnCategory;
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.StringArrayDeviceID;
 import org.apache.tsfile.file.metadata.TableSchema;
 import org.apache.tsfile.read.v4.DeviceTableModelReader;
+import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.TsFileGeneratorForTest;
 import org.apache.tsfile.utils.TsFileGeneratorUtils;
 import org.apache.tsfile.write.record.Tablet;
@@ -37,6 +40,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.File;
+import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
@@ -45,6 +49,104 @@ import java.util.List;
 
 public class TsFileV4ReadWriteInterfacesTest {
 
+  @Test
+  public void testWriteSomeColumns() throws IOException, WriteProcessException 
{
+    String filePath = TsFileGeneratorForTest.getTestTsFilePath("db", 0, 0, 0);
+
+    TableSchema tableSchema =
+        new TableSchema(
+            "t1",
+            Arrays.asList(
+                new MeasurementSchema("device", TSDataType.STRING),
+                new MeasurementSchema("s1", TSDataType.INT32),
+                new MeasurementSchema("s2", TSDataType.INT32),
+                new MeasurementSchema("s3", TSDataType.INT32)),
+            Arrays.asList(
+                ColumnCategory.TAG,
+                ColumnCategory.FIELD,
+                ColumnCategory.FIELD,
+                ColumnCategory.FIELD));
+    Tablet tablet1 =
+        new Tablet(
+            tableSchema.getTableName(),
+            Arrays.asList("device", "s1"),
+            Arrays.asList(TSDataType.STRING, TSDataType.INT32),
+            Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD));
+    for (int i = 0; i < 1000; i++) {
+      tablet1.addTimestamp(i, i);
+      tablet1.addValue("device", i, "d1");
+      tablet1.addValue("s1", i, 0);
+    }
+    Tablet tablet2 =
+        new Tablet(
+            tableSchema.getTableName(),
+            
IMeasurementSchema.getMeasurementNameList(tableSchema.getColumnSchemas()),
+            IMeasurementSchema.getDataTypeList(tableSchema.getColumnSchemas()),
+            tableSchema.getColumnTypes());
+    for (int i = 0; i < 1000; i++) {
+      tablet2.addTimestamp(i, 1005 + i);
+      tablet2.addValue("device", i, "d1");
+      tablet2.addValue("s1", i, 1);
+      tablet2.addValue("s2", i, 1);
+      tablet2.addValue("s3", i, 1);
+    }
+    try (ITsFileWriter writer =
+        new TsFileWriterBuilder()
+            .file(new File(filePath))
+            .tableSchema(tableSchema)
+            .memoryThreshold(1)
+            .build()) {
+      writer.write(tablet1);
+      writer.write(tablet2);
+    }
+    try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath)) {
+      TsFileDeviceIterator deviceIterator = 
reader.getAllDevicesIteratorWithIsAligned();
+      while (deviceIterator.hasNext()) {
+        Pair<IDeviceID, Boolean> pair = deviceIterator.next();
+        List<AbstractAlignedChunkMetadata> alignedChunkMetadataList =
+            reader.getAlignedChunkMetadataByMetadataIndexNode(
+                pair.getLeft(), 
deviceIterator.getFirstMeasurementNodeOfCurrentDevice(), false);
+        Assert.assertFalse(alignedChunkMetadataList.isEmpty());
+        Assert.assertEquals(3, 
alignedChunkMetadataList.get(0).getValueChunkMetadataList().size());
+        Assert.assertEquals(
+            1000,
+            alignedChunkMetadataList
+                .get(0)
+                .getValueChunkMetadataList()
+                .get(0)
+                .getStatistics()
+                .getCount());
+        
Assert.assertNull(alignedChunkMetadataList.get(0).getValueChunkMetadataList().get(1));
+        
Assert.assertNull(alignedChunkMetadataList.get(0).getValueChunkMetadataList().get(2));
+        Assert.assertEquals(3, 
alignedChunkMetadataList.get(1).getValueChunkMetadataList().size());
+        Assert.assertEquals(
+            1000,
+            alignedChunkMetadataList
+                .get(1)
+                .getValueChunkMetadataList()
+                .get(0)
+                .getStatistics()
+                .getCount());
+        Assert.assertEquals(
+            1000,
+            alignedChunkMetadataList
+                .get(1)
+                .getValueChunkMetadataList()
+                .get(1)
+                .getStatistics()
+                .getCount());
+        Assert.assertEquals(
+            1000,
+            alignedChunkMetadataList
+                .get(1)
+                .getValueChunkMetadataList()
+                .get(2)
+                .getStatistics()
+                .getCount());
+      }
+    }
+  }
+
   @Test
   public void testGetTableDeviceMethods() throws Exception {
     String filePath = TsFileGeneratorForTest.getTestTsFilePath("root.testsg", 
0, 0, 0);
diff --git 
a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java 
b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
index 17a0d0f1..264bf9e5 100644
--- a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
+++ b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
@@ -26,12 +26,15 @@ import 
org.apache.tsfile.exception.write.WriteProcessException;
 import org.apache.tsfile.file.MetaMarker;
 import org.apache.tsfile.file.header.ChunkHeader;
 import org.apache.tsfile.file.header.PageHeader;
+import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata;
 import org.apache.tsfile.file.metadata.ChunkMetadata;
 import org.apache.tsfile.file.metadata.ColumnSchema;
 import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
 import org.apache.tsfile.file.metadata.TableSchema;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.tsfile.read.TsFileDeviceIterator;
 import org.apache.tsfile.read.TsFileReader;
 import org.apache.tsfile.read.TsFileSequenceReader;
 import org.apache.tsfile.read.common.Chunk;
@@ -41,6 +44,7 @@ import org.apache.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.tsfile.read.query.dataset.ResultSet;
 import org.apache.tsfile.read.v4.ITsFileReader;
 import org.apache.tsfile.read.v4.TsFileReaderBuilder;
+import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.TsFileGeneratorUtils;
 import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
 import org.apache.tsfile.write.chunk.ChunkWriterImpl;
@@ -899,6 +903,181 @@ public class TsFileWriteApiTest {
     }
   }
 
+  @Test
+  public void testWriteSomeColumnsOfTree() throws IOException, 
WriteProcessException {
+    List<IMeasurementSchema> fullMeasurementSchemas =
+        Arrays.asList(
+            new MeasurementSchema("s1", TSDataType.INT32),
+            new MeasurementSchema("s2", TSDataType.INT32),
+            new MeasurementSchema("s3", TSDataType.INT32));
+    List<IMeasurementSchema> measurementSchemas1 =
+        Arrays.asList(new MeasurementSchema("s1", TSDataType.INT32));
+    IDeviceID device = new StringArrayDeviceID("root.test.d1");
+    Tablet tablet1 =
+        new Tablet(
+            device,
+            IMeasurementSchema.getMeasurementNameList(fullMeasurementSchemas),
+            IMeasurementSchema.getDataTypeList(fullMeasurementSchemas));
+    Tablet tablet2 =
+        new Tablet(
+            device,
+            IMeasurementSchema.getMeasurementNameList(measurementSchemas1),
+            IMeasurementSchema.getDataTypeList(measurementSchemas1));
+    for (int i = 0; i < 1000; i++) {
+      tablet1.addTimestamp(i, i);
+      tablet1.addValue("s1", i, 1);
+      tablet1.addValue("s2", i, 1);
+      tablet1.addValue("s3", i, 1);
+    }
+    for (int i = 0; i < 1000; i++) {
+      tablet2.addTimestamp(i, i + 1005);
+      tablet2.addValue("s1", i, 0);
+    }
+    try (TsFileWriter writer = new TsFileWriter(f)) {
+      writer.registerAlignedTimeseries(device, fullMeasurementSchemas);
+      writer.setChunkGroupSizeThreshold(1);
+      writer.writeTree(tablet1);
+      writer.writeTree(tablet2);
+    }
+    try (TsFileSequenceReader reader = new TsFileSequenceReader(f.getPath())) {
+      TsFileDeviceIterator deviceIterator = 
reader.getAllDevicesIteratorWithIsAligned();
+      while (deviceIterator.hasNext()) {
+        Pair<IDeviceID, Boolean> pair = deviceIterator.next();
+        List<AbstractAlignedChunkMetadata> alignedChunkMetadataList =
+            reader.getAlignedChunkMetadataByMetadataIndexNode(
+                pair.getLeft(), 
deviceIterator.getFirstMeasurementNodeOfCurrentDevice(), false);
+        Assert.assertFalse(alignedChunkMetadataList.isEmpty());
+        Assert.assertEquals(3, 
alignedChunkMetadataList.get(0).getValueChunkMetadataList().size());
+        Assert.assertEquals(
+            1000,
+            alignedChunkMetadataList
+                .get(0)
+                .getValueChunkMetadataList()
+                .get(0)
+                .getStatistics()
+                .getCount());
+        Assert.assertEquals(
+            1000,
+            alignedChunkMetadataList
+                .get(0)
+                .getValueChunkMetadataList()
+                .get(1)
+                .getStatistics()
+                .getCount());
+        Assert.assertEquals(
+            1000,
+            alignedChunkMetadataList
+                .get(0)
+                .getValueChunkMetadataList()
+                .get(2)
+                .getStatistics()
+                .getCount());
+        Assert.assertEquals(3, 
alignedChunkMetadataList.get(1).getValueChunkMetadataList().size());
+        Assert.assertEquals(
+            1000,
+            alignedChunkMetadataList
+                .get(1)
+                .getValueChunkMetadataList()
+                .get(0)
+                .getStatistics()
+                .getCount());
+        
Assert.assertNull(alignedChunkMetadataList.get(1).getValueChunkMetadataList().get(1));
+        
Assert.assertNull(alignedChunkMetadataList.get(1).getValueChunkMetadataList().get(2));
+      }
+    }
+  }
+
+  @Test
+  public void testWriteSomeColumnsOfTable() throws IOException, 
WriteProcessException {
+    TableSchema tableSchema =
+        new TableSchema(
+            "t1",
+            Arrays.asList(
+                new MeasurementSchema("device", TSDataType.STRING),
+                new MeasurementSchema("s1", TSDataType.INT32),
+                new MeasurementSchema("s2", TSDataType.INT32),
+                new MeasurementSchema("s3", TSDataType.INT32)),
+            Arrays.asList(
+                ColumnCategory.TAG,
+                ColumnCategory.FIELD,
+                ColumnCategory.FIELD,
+                ColumnCategory.FIELD));
+    Tablet tablet1 =
+        new Tablet(
+            tableSchema.getTableName(),
+            Arrays.asList("device", "s1"),
+            Arrays.asList(TSDataType.STRING, TSDataType.INT32),
+            Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD));
+    for (int i = 0; i < 1000; i++) {
+      tablet1.addTimestamp(i, i);
+      tablet1.addValue("s1", i, 0);
+    }
+    Tablet tablet2 =
+        new Tablet(
+            tableSchema.getTableName(),
+            
IMeasurementSchema.getMeasurementNameList(tableSchema.getColumnSchemas()),
+            IMeasurementSchema.getDataTypeList(tableSchema.getColumnSchemas()),
+            tableSchema.getColumnTypes());
+    for (int i = 0; i < 1000; i++) {
+      tablet2.addTimestamp(i, 1005 + i);
+      tablet2.addValue("s1", i, 1);
+      tablet2.addValue("s2", i, 1);
+      tablet2.addValue("s3", i, 1);
+    }
+    try (TsFileWriter writer = new TsFileWriter(f)) {
+      writer.registerTableSchema(tableSchema);
+      writer.setChunkGroupSizeThreshold(1);
+      writer.writeTable(tablet1);
+      writer.writeTable(tablet2);
+    }
+    try (TsFileSequenceReader reader = new TsFileSequenceReader(f.getPath())) {
+      TsFileDeviceIterator deviceIterator = 
reader.getAllDevicesIteratorWithIsAligned();
+      while (deviceIterator.hasNext()) {
+        Pair<IDeviceID, Boolean> pair = deviceIterator.next();
+        List<AbstractAlignedChunkMetadata> alignedChunkMetadataList =
+            reader.getAlignedChunkMetadataByMetadataIndexNode(
+                pair.getLeft(), 
deviceIterator.getFirstMeasurementNodeOfCurrentDevice(), false);
+        Assert.assertFalse(alignedChunkMetadataList.isEmpty());
+        Assert.assertEquals(3, 
alignedChunkMetadataList.get(0).getValueChunkMetadataList().size());
+        Assert.assertEquals(
+            1000,
+            alignedChunkMetadataList
+                .get(0)
+                .getValueChunkMetadataList()
+                .get(0)
+                .getStatistics()
+                .getCount());
+        
Assert.assertNull(alignedChunkMetadataList.get(0).getValueChunkMetadataList().get(1));
+        
Assert.assertNull(alignedChunkMetadataList.get(0).getValueChunkMetadataList().get(2));
+        Assert.assertEquals(3, 
alignedChunkMetadataList.get(1).getValueChunkMetadataList().size());
+        Assert.assertEquals(
+            1000,
+            alignedChunkMetadataList
+                .get(1)
+                .getValueChunkMetadataList()
+                .get(0)
+                .getStatistics()
+                .getCount());
+        Assert.assertEquals(
+            1000,
+            alignedChunkMetadataList
+                .get(1)
+                .getValueChunkMetadataList()
+                .get(1)
+                .getStatistics()
+                .getCount());
+        Assert.assertEquals(
+            1000,
+            alignedChunkMetadataList
+                .get(1)
+                .getValueChunkMetadataList()
+                .get(2)
+                .getStatistics()
+                .getCount());
+      }
+    }
+  }
+
   @Test
   public void writeTableTsFileWithUpperCaseColumns() throws IOException, 
WriteProcessException {
     setEnv(100 * 1024 * 1024, 10 * 1024);

Reply via email to