This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/develop by this push:
     new 69d1fe22 Add some support for changing schema during write (#718)
69d1fe22 is described below

commit 69d1fe22e6dd640240bd78b2e0687d325c01b9b7
Author: Jiang Tian <[email protected]>
AuthorDate: Fri Jan 30 18:15:54 2026 +0800

    Add some support for changing schema during write (#718)
    
    * add  void writeToFileWriter(TsFileIOWriter tsfileWriter, Function<String, 
String> measurementNameRemapper) throws IOException;
    
    * spotless
    
    * fix test
    
    * bump version
    
    * add gettet and setter
    
    * Add support for schema evolution
    
    * bump version
    
    * bump version
    
    * bump version
    
    * revert
---
 .../metadata/AbstractAlignedChunkMetadata.java     |  5 +++++
 .../apache/tsfile/file/metadata/ChunkMetadata.java |  5 +++++
 .../tsfile/file/metadata/IChunkMetadata.java       |  2 ++
 .../apache/tsfile/file/metadata/TableSchema.java   |  8 ++++++++
 .../tsfile/write/chunk/AlignedChunkWriterImpl.java | 11 ++++++++++
 .../apache/tsfile/write/chunk/ChunkWriterImpl.java | 20 +++++++++++++++---
 .../apache/tsfile/write/chunk/IChunkWriter.java    |  8 ++++++++
 .../tsfile/write/chunk/ValueChunkWriter.java       | 24 ++++++++++++++++++----
 8 files changed, 76 insertions(+), 7 deletions(-)

diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AbstractAlignedChunkMetadata.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AbstractAlignedChunkMetadata.java
index e195b114..52a1913c 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AbstractAlignedChunkMetadata.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AbstractAlignedChunkMetadata.java
@@ -177,6 +177,11 @@ public abstract class AbstractAlignedChunkMetadata 
implements IChunkMetadata {
     return timeChunkMetadata.getMeasurementUid();
   }
 
+  @Override
+  public void setMeasurementUid(String measurementUid) {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   public void insertIntoSortedDeletions(TimeRange timeRange) {
     throw new UnsupportedOperationException();
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java 
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java
index 653a8991..a76ef65c 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java
@@ -153,6 +153,11 @@ public class ChunkMetadata implements IChunkMetadata {
     return measurementUid;
   }
 
+  @Override
+  public void setMeasurementUid(String measurementUid) {
+    this.measurementUid = measurementUid;
+  }
+
   @Override
   public Statistics<? extends Serializable> getStatistics() {
     return statistics;
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/IChunkMetadata.java 
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/IChunkMetadata.java
index e29e7147..f391ed7f 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/IChunkMetadata.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/IChunkMetadata.java
@@ -63,6 +63,8 @@ public interface IChunkMetadata extends IMetadata {
 
   String getMeasurementUid();
 
+  void setMeasurementUid(String measurementUid);
+
   void insertIntoSortedDeletions(TimeRange timeRange);
 
   List<TimeRange> getDeleteIntervalList();
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java 
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
index 6d630c38..9d1f7f2a 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
@@ -356,4 +356,12 @@ public class TableSchema {
     tagColumnCnt = (int) columnCategories.stream().filter(c -> c == 
ColumnCategory.TAG).count();
     return tagColumnCnt;
   }
+
+  public boolean isUpdatable() {
+    return updatable;
+  }
+
+  public void setUpdatable(boolean updatable) {
+    this.updatable = updatable;
+  }
 }
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
index 27761e67..fa3cb348 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
@@ -41,6 +41,7 @@ import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.function.Function;
 
 public class AlignedChunkWriterImpl implements IChunkWriter {
 
@@ -477,6 +478,16 @@ public class AlignedChunkWriterImpl implements 
IChunkWriter {
     }
   }
 
+  @Override
+  public void writeToFileWriter(
+      TsFileIOWriter tsfileWriter, Function<String, String> 
measurementNameRemapper)
+      throws IOException {
+    timeChunkWriter.writeToFileWriter(tsfileWriter);
+    for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+      valueChunkWriter.writeToFileWriter(tsfileWriter, 
measurementNameRemapper);
+    }
+  }
+
   @Override
   public long estimateMaxSeriesMemSize() {
     long estimateMaxSeriesMemSize = timeChunkWriter.estimateMaxSeriesMemSize();
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ChunkWriterImpl.java 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ChunkWriterImpl.java
index 25e0410b..74205436 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ChunkWriterImpl.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ChunkWriterImpl.java
@@ -43,6 +43,7 @@ import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
+import java.util.function.Function;
 
 public class ChunkWriterImpl implements IChunkWriter {
 
@@ -346,8 +347,15 @@ public class ChunkWriterImpl implements IChunkWriter {
 
   @Override
   public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws 
IOException {
+    writeToFileWriter(tsfileWriter, null);
+  }
+
+  @Override
+  public void writeToFileWriter(
+      TsFileIOWriter tsfileWriter, Function<String, String> 
measurementNameRemapper)
+      throws IOException {
     sealCurrentPage();
-    writeAllPagesOfChunkToTsFile(tsfileWriter, statistics);
+    writeAllPagesOfChunkToTsFile(tsfileWriter, statistics, 
measurementNameRemapper);
 
     // reinit this chunk writer
     pageBuffer.reset();
@@ -474,17 +482,23 @@ public class ChunkWriterImpl implements IChunkWriter {
    *
    * @param writer the specified IOWriter
    * @param statistics the chunk statistics
+   * @param measurementNameRemapper
    * @throws IOException exception in IO
    */
   private void writeAllPagesOfChunkToTsFile(
-      TsFileIOWriter writer, Statistics<? extends Serializable> statistics) 
throws IOException {
+      TsFileIOWriter writer,
+      Statistics<? extends Serializable> statistics,
+      Function<String, String> measurementNameRemapper)
+      throws IOException {
     if (statistics.getCount() == 0) {
       return;
     }
 
     // start to write this column chunk
     writer.startFlushChunk(
-        measurementSchema.getMeasurementName(),
+        measurementNameRemapper == null
+            ? measurementSchema.getMeasurementName()
+            : 
measurementNameRemapper.apply(measurementSchema.getMeasurementName()),
         compressor.getType(),
         measurementSchema.getType(),
         measurementSchema.getEncodingType(),
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/IChunkWriter.java 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/IChunkWriter.java
index 85b9828a..4a198841 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/IChunkWriter.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/IChunkWriter.java
@@ -21,6 +21,7 @@ package org.apache.tsfile.write.chunk;
 import org.apache.tsfile.write.writer.TsFileIOWriter;
 
 import java.io.IOException;
+import java.util.function.Function;
 
 /** IChunkWriter provides a list of writing methods for different value types. 
*/
 public interface IChunkWriter {
@@ -28,6 +29,13 @@ public interface IChunkWriter {
   /** flush data to TsFileIOWriter. */
   void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException;
 
+  /**
+   * flush data to TsFileIOWriter, may rename the measurement in the file 
according to the remapper.
+   */
+  void writeToFileWriter(
+      TsFileIOWriter tsfileWriter, Function<String, String> 
measurementNameRemapper)
+      throws IOException;
+
   /** estimate memory usage of this series. */
   long estimateMaxSeriesMemSize();
 
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ValueChunkWriter.java 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ValueChunkWriter.java
index d011e125..a1c03ee5 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ValueChunkWriter.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ValueChunkWriter.java
@@ -45,6 +45,7 @@ import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
+import java.util.function.Function;
 
 public class ValueChunkWriter {
 
@@ -295,8 +296,14 @@ public class ValueChunkWriter {
   }
 
   public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws 
IOException {
+    writeToFileWriter(tsfileWriter, null);
+  }
+
+  public void writeToFileWriter(
+      TsFileIOWriter tsfileWriter, Function<String, String> 
measurementNameRemapper)
+      throws IOException {
     sealCurrentPage();
-    writeAllPagesOfChunkToTsFile(tsfileWriter);
+    writeAllPagesOfChunkToTsFile(tsfileWriter, measurementNameRemapper);
 
     // reinit this chunk writer
     pageBuffer.reset();
@@ -379,13 +386,22 @@ public class ValueChunkWriter {
     return dataType;
   }
 
+  public void writeAllPagesOfChunkToTsFile(TsFileIOWriter writer) throws 
IOException {
+    writeAllPagesOfChunkToTsFile(writer, null);
+  }
+
   /**
    * write the page to specified IOWriter.
    *
    * @param writer the specified IOWriter
    * @throws IOException exception in IO
    */
-  public void writeAllPagesOfChunkToTsFile(TsFileIOWriter writer) throws 
IOException {
+  public void writeAllPagesOfChunkToTsFile(
+      TsFileIOWriter writer, Function<String, String> measurementNameRemapper) 
throws IOException {
+    String finalMeasurementId =
+        measurementNameRemapper == null
+            ? measurementId
+            : measurementNameRemapper.apply(measurementId);
     if (statistics.getCount() == 0) {
       if (pageBuffer.size() == 0) {
         return;
@@ -395,7 +411,7 @@ public class ValueChunkWriter {
       // chunkGroup during compaction. To save the disk space, we only 
serialize chunkHeader for the
       // empty valueChunk, whose dataSize is 0.
       writer.startFlushChunk(
-          measurementId,
+          finalMeasurementId,
           compressionType,
           dataType,
           encodingType,
@@ -409,7 +425,7 @@ public class ValueChunkWriter {
 
     // start to write this column chunk
     writer.startFlushChunk(
-        measurementId,
+        finalMeasurementId,
         compressionType,
         dataType,
         encodingType,

Reply via email to