This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch tsFile_v4
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/tsFile_v4 by this push:
     new ab5f90e9 add aligned table write
ab5f90e9 is described below

commit ab5f90e9dc39c15b90088b3d8422d6ed12f2102d
Author: jt2594838 <[email protected]>
AuthorDate: Thu Apr 11 10:15:06 2024 +0800

    add aligned table write
---
 .../java/org/apache/tsfile/write/TsFileWriter.java | 23 +++++++++++++++-------
 .../write/chunk/AlignedChunkGroupWriterImpl.java   | 17 +++++++++++++---
 2 files changed, 30 insertions(+), 10 deletions(-)

diff --git a/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java 
b/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
index df5d9dbd..5aacadfe 100644
--- a/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
@@ -94,6 +94,8 @@ public class TsFileWriter implements AutoCloseable {
 
   private long chunkGroupSizeThreshold;
 
+  private boolean isTableWriteAligned = true;
+
   /**
    * init this TsFileWriter.
    *
@@ -485,8 +487,8 @@ public class TsFileWriter implements AutoCloseable {
   }
 
   private IChunkGroupWriter tryToInitialGroupWriter(IDeviceID deviceId, 
boolean isAligned) {
-    IChunkGroupWriter groupWriter;
-    if (!groupWriters.containsKey(deviceId)) {
+    IChunkGroupWriter groupWriter = groupWriters.get(deviceId);
+    if (groupWriter == null) {
       if (isAligned) {
         groupWriter = new AlignedChunkGroupWriterImpl(deviceId);
         if (!isUnseq) { // Sequence File
@@ -502,8 +504,6 @@ public class TsFileWriter implements AutoCloseable {
         }
       }
       groupWriters.put(deviceId, groupWriter);
-    } else {
-      groupWriter = groupWriters.get(deviceId);
     }
     return groupWriter;
   }
@@ -678,17 +678,26 @@ public class TsFileWriter implements AutoCloseable {
   }
 
   public boolean writeTable(Tablet tablet) throws IOException, 
WriteProcessException {
-    // make sure the ChunkGroupWriter for this Tablet exist
-    checkIsTableExist(tablet);
     // spilt the tablet by deviceId
     final List<Pair<IDeviceID, Integer>> deviceIdEndIndexPairs = 
WriteUtils.splitTabletByDevice(tablet);
+    // make sure the ChunkGroupWriter for this Tablet exist
+    checkIsTableExist(tablet);
+
     int startIndex = 0;
     for (Pair<IDeviceID, Integer> pair : deviceIdEndIndexPairs) {
       // get corresponding ChunkGroupWriter and write this Tablet
-      recordCount += groupWriters.get(pair.left).write(tablet, startIndex, 
pair.right,
+      recordCount += tryToInitialGroupWriter(pair.left, 
isTableWriteAligned).write(tablet, startIndex, pair.right,
           tablet.getIdColumnRange(), tablet.getSchemas().size());
       startIndex = pair.right;
     }
     return checkMemorySizeAndMayFlushChunks();
   }
+
+  public boolean isTableWriteAligned() {
+    return isTableWriteAligned;
+  }
+
+  public void setTableWriteAligned(boolean tableWriteAligned) {
+    isTableWriteAligned = tableWriteAligned;
+  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
 
b/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
index f23dddbd..67552726 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
@@ -71,8 +71,13 @@ public class AlignedChunkGroupWriterImpl implements 
IChunkGroupWriter {
 
   @Override
   public void tryToAddSeriesWriter(MeasurementSchema measurementSchema) throws 
IOException {
-    if 
(!valueChunkWriterMap.containsKey(measurementSchema.getMeasurementId())) {
-      ValueChunkWriter valueChunkWriter =
+    tryToAddSeriesWriterInternal(measurementSchema);
+  }
+
+  public ValueChunkWriter tryToAddSeriesWriterInternal(MeasurementSchema 
measurementSchema) throws IOException {
+    ValueChunkWriter valueChunkWriter = 
valueChunkWriterMap.get(measurementSchema.getMeasurementId());
+    if (valueChunkWriter == null) {
+      valueChunkWriter =
           new ValueChunkWriter(
               measurementSchema.getMeasurementId(),
               measurementSchema.getCompressor(),
@@ -82,6 +87,7 @@ public class AlignedChunkGroupWriterImpl implements 
IChunkGroupWriter {
       valueChunkWriterMap.put(measurementSchema.getMeasurementId(), 
valueChunkWriter);
       tryToAddEmptyPageAndData(valueChunkWriter);
     }
+    return valueChunkWriter;
   }
 
   @Override
@@ -166,6 +172,7 @@ public class AlignedChunkGroupWriterImpl implements 
IChunkGroupWriter {
     int pointCount = 0;
     List<MeasurementSchema> measurementSchemas = tablet.getSchemas();
     List<ValueChunkWriter> emptyValueChunkWriters = new ArrayList<>();
+    //TODO: should we allow duplicated measurements in a Tablet?
     Set<String> existingMeasurements =
         measurementSchemas.stream()
             .map(MeasurementSchema::getMeasurementId)
@@ -175,6 +182,8 @@ public class AlignedChunkGroupWriterImpl implements 
IChunkGroupWriter {
         emptyValueChunkWriters.add(entry.getValue());
       }
     }
+    //TODO: changing to a column-first style by calculating the remaining page 
space of each
+    // column firsts
     for (int row = startRowIndex; row < endRowIndex; row++) {
       long time = tablet.timestamps[row];
       checkIsHistoryData(time);
@@ -185,7 +194,7 @@ public class AlignedChunkGroupWriterImpl implements 
IChunkGroupWriter {
                 && tablet.bitMaps[columnIndex].isMarked(row);
         // check isNull by bitMap in tablet
         ValueChunkWriter valueChunkWriter =
-            
valueChunkWriterMap.get(measurementSchemas.get(columnIndex).getMeasurementId());
+            tryToAddSeriesWriterInternal(measurementSchemas.get(columnIndex));
         switch (measurementSchemas.get(columnIndex).getType()) {
           case BOOLEAN:
             valueChunkWriter.write(time, ((boolean[]) 
tablet.values[columnIndex])[row], isNull);
@@ -212,6 +221,8 @@ public class AlignedChunkGroupWriterImpl implements 
IChunkGroupWriter {
                     measurementSchemas.get(columnIndex).getType()));
         }
       }
+      // TODO: we can write the null columns after whole insertion, according 
to the point number
+      //  in the time chunk before and after, no need to do it in a row-by-row 
manner
       if (!emptyValueChunkWriters.isEmpty()) {
         writeEmptyDataInOneRow(emptyValueChunkWriters);
       }

Reply via email to