This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 69d1fe22 Add some support for changing schema during write (#718)
69d1fe22 is described below
commit 69d1fe22e6dd640240bd78b2e0687d325c01b9b7
Author: Jiang Tian <[email protected]>
AuthorDate: Fri Jan 30 18:15:54 2026 +0800
Add some support for changing schema during write (#718)
* add void writeToFileWriter(TsFileIOWriter tsfileWriter, Function<String,
String> measurementNameRemapper) throws IOException;
* spotless
* fix test
* bump version
* add gettet and setter
* Add support for schema evolution
* bump version
* bump version
* bump version
* revert
---
.../metadata/AbstractAlignedChunkMetadata.java | 5 +++++
.../apache/tsfile/file/metadata/ChunkMetadata.java | 5 +++++
.../tsfile/file/metadata/IChunkMetadata.java | 2 ++
.../apache/tsfile/file/metadata/TableSchema.java | 8 ++++++++
.../tsfile/write/chunk/AlignedChunkWriterImpl.java | 11 ++++++++++
.../apache/tsfile/write/chunk/ChunkWriterImpl.java | 20 +++++++++++++++---
.../apache/tsfile/write/chunk/IChunkWriter.java | 8 ++++++++
.../tsfile/write/chunk/ValueChunkWriter.java | 24 ++++++++++++++++++----
8 files changed, 76 insertions(+), 7 deletions(-)
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AbstractAlignedChunkMetadata.java
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AbstractAlignedChunkMetadata.java
index e195b114..52a1913c 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AbstractAlignedChunkMetadata.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AbstractAlignedChunkMetadata.java
@@ -177,6 +177,11 @@ public abstract class AbstractAlignedChunkMetadata
implements IChunkMetadata {
return timeChunkMetadata.getMeasurementUid();
}
+ @Override
+ public void setMeasurementUid(String measurementUid) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public void insertIntoSortedDeletions(TimeRange timeRange) {
throw new UnsupportedOperationException();
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java
index 653a8991..a76ef65c 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java
@@ -153,6 +153,11 @@ public class ChunkMetadata implements IChunkMetadata {
return measurementUid;
}
+ @Override
+ public void setMeasurementUid(String measurementUid) {
+ this.measurementUid = measurementUid;
+ }
+
@Override
public Statistics<? extends Serializable> getStatistics() {
return statistics;
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/IChunkMetadata.java
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/IChunkMetadata.java
index e29e7147..f391ed7f 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/IChunkMetadata.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/IChunkMetadata.java
@@ -63,6 +63,8 @@ public interface IChunkMetadata extends IMetadata {
String getMeasurementUid();
+ void setMeasurementUid(String measurementUid);
+
void insertIntoSortedDeletions(TimeRange timeRange);
List<TimeRange> getDeleteIntervalList();
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
index 6d630c38..9d1f7f2a 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
@@ -356,4 +356,12 @@ public class TableSchema {
tagColumnCnt = (int) columnCategories.stream().filter(c -> c ==
ColumnCategory.TAG).count();
return tagColumnCnt;
}
+
+ public boolean isUpdatable() {
+ return updatable;
+ }
+
+ public void setUpdatable(boolean updatable) {
+ this.updatable = updatable;
+ }
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
index 27761e67..fa3cb348 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
@@ -41,6 +41,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
+import java.util.function.Function;
public class AlignedChunkWriterImpl implements IChunkWriter {
@@ -477,6 +478,16 @@ public class AlignedChunkWriterImpl implements
IChunkWriter {
}
}
+ @Override
+ public void writeToFileWriter(
+ TsFileIOWriter tsfileWriter, Function<String, String>
measurementNameRemapper)
+ throws IOException {
+ timeChunkWriter.writeToFileWriter(tsfileWriter);
+ for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+ valueChunkWriter.writeToFileWriter(tsfileWriter,
measurementNameRemapper);
+ }
+ }
+
@Override
public long estimateMaxSeriesMemSize() {
long estimateMaxSeriesMemSize = timeChunkWriter.estimateMaxSeriesMemSize();
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ChunkWriterImpl.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ChunkWriterImpl.java
index 25e0410b..74205436 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ChunkWriterImpl.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ChunkWriterImpl.java
@@ -43,6 +43,7 @@ import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
+import java.util.function.Function;
public class ChunkWriterImpl implements IChunkWriter {
@@ -346,8 +347,15 @@ public class ChunkWriterImpl implements IChunkWriter {
@Override
public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws
IOException {
+ writeToFileWriter(tsfileWriter, null);
+ }
+
+ @Override
+ public void writeToFileWriter(
+ TsFileIOWriter tsfileWriter, Function<String, String>
measurementNameRemapper)
+ throws IOException {
sealCurrentPage();
- writeAllPagesOfChunkToTsFile(tsfileWriter, statistics);
+ writeAllPagesOfChunkToTsFile(tsfileWriter, statistics,
measurementNameRemapper);
// reinit this chunk writer
pageBuffer.reset();
@@ -474,17 +482,23 @@ public class ChunkWriterImpl implements IChunkWriter {
*
* @param writer the specified IOWriter
* @param statistics the chunk statistics
+ * @param measurementNameRemapper
* @throws IOException exception in IO
*/
private void writeAllPagesOfChunkToTsFile(
- TsFileIOWriter writer, Statistics<? extends Serializable> statistics)
throws IOException {
+ TsFileIOWriter writer,
+ Statistics<? extends Serializable> statistics,
+ Function<String, String> measurementNameRemapper)
+ throws IOException {
if (statistics.getCount() == 0) {
return;
}
// start to write this column chunk
writer.startFlushChunk(
- measurementSchema.getMeasurementName(),
+ measurementNameRemapper == null
+ ? measurementSchema.getMeasurementName()
+ :
measurementNameRemapper.apply(measurementSchema.getMeasurementName()),
compressor.getType(),
measurementSchema.getType(),
measurementSchema.getEncodingType(),
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/IChunkWriter.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/IChunkWriter.java
index 85b9828a..4a198841 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/IChunkWriter.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/IChunkWriter.java
@@ -21,6 +21,7 @@ package org.apache.tsfile.write.chunk;
import org.apache.tsfile.write.writer.TsFileIOWriter;
import java.io.IOException;
+import java.util.function.Function;
/** IChunkWriter provides a list of writing methods for different value types.
*/
public interface IChunkWriter {
@@ -28,6 +29,13 @@ public interface IChunkWriter {
/** flush data to TsFileIOWriter. */
void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException;
+ /**
+ * flush data to TsFileIOWriter, may rename the measurement in the file
according to the remapper.
+ */
+ void writeToFileWriter(
+ TsFileIOWriter tsfileWriter, Function<String, String>
measurementNameRemapper)
+ throws IOException;
+
/** estimate memory usage of this series. */
long estimateMaxSeriesMemSize();
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ValueChunkWriter.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ValueChunkWriter.java
index d011e125..a1c03ee5 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ValueChunkWriter.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ValueChunkWriter.java
@@ -45,6 +45,7 @@ import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
+import java.util.function.Function;
public class ValueChunkWriter {
@@ -295,8 +296,14 @@ public class ValueChunkWriter {
}
public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws
IOException {
+ writeToFileWriter(tsfileWriter, null);
+ }
+
+ public void writeToFileWriter(
+ TsFileIOWriter tsfileWriter, Function<String, String>
measurementNameRemapper)
+ throws IOException {
sealCurrentPage();
- writeAllPagesOfChunkToTsFile(tsfileWriter);
+ writeAllPagesOfChunkToTsFile(tsfileWriter, measurementNameRemapper);
// reinit this chunk writer
pageBuffer.reset();
@@ -379,13 +386,22 @@ public class ValueChunkWriter {
return dataType;
}
+ public void writeAllPagesOfChunkToTsFile(TsFileIOWriter writer) throws
IOException {
+ writeAllPagesOfChunkToTsFile(writer, null);
+ }
+
/**
* write the page to specified IOWriter.
*
* @param writer the specified IOWriter
* @throws IOException exception in IO
*/
- public void writeAllPagesOfChunkToTsFile(TsFileIOWriter writer) throws
IOException {
+ public void writeAllPagesOfChunkToTsFile(
+ TsFileIOWriter writer, Function<String, String> measurementNameRemapper)
throws IOException {
+ String finalMeasurementId =
+ measurementNameRemapper == null
+ ? measurementId
+ : measurementNameRemapper.apply(measurementId);
if (statistics.getCount() == 0) {
if (pageBuffer.size() == 0) {
return;
@@ -395,7 +411,7 @@ public class ValueChunkWriter {
// chunkGroup during compaction. To save the disk space, we only
serialize chunkHeader for the
// empty valueChunk, whose dataSize is 0.
writer.startFlushChunk(
- measurementId,
+ finalMeasurementId,
compressionType,
dataType,
encodingType,
@@ -409,7 +425,7 @@ public class ValueChunkWriter {
// start to write this column chunk
writer.startFlushChunk(
- measurementId,
+ finalMeasurementId,
compressionType,
dataType,
encodingType,