This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.1
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/dev/1.1 by this push:
new c5339fd2 Init all series writer for AlignedChunkGroupWriter (#637)
c5339fd2 is described below
commit c5339fd22f583dc19d212a3890a3edf6d6958219
Author: shuwenwei <[email protected]>
AuthorDate: Thu Nov 13 11:13:29 2025 +0800
Init all series writer for AlignedChunkGroupWriter (#637)
---
.../java/org/apache/tsfile/write/TsFileWriter.java | 12 +++-
.../apache/tsfile/write/TsFileWriteApiTest.java | 84 ++++++++++++++++++++++
2 files changed, 95 insertions(+), 1 deletion(-)
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
index d5ac5ded..981a043b 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
@@ -462,11 +462,13 @@ public class TsFileWriter implements AutoCloseable {
return schemas;
}
- private IChunkGroupWriter tryToInitialGroupWriter(IDeviceID deviceId,
boolean isAligned) {
+ private IChunkGroupWriter tryToInitialGroupWriter(IDeviceID deviceId,
boolean isAligned)
+ throws IOException {
IChunkGroupWriter groupWriter;
if (!groupWriters.containsKey(deviceId)) {
if (isAligned) {
groupWriter = new AlignedChunkGroupWriterImpl(deviceId);
+ initAllSeriesWriterForAlignedSeries((AlignedChunkGroupWriterImpl)
groupWriter, deviceId);
if (!isUnseq) { // Sequence File
((AlignedChunkGroupWriterImpl) groupWriter)
.setLastTime(alignedDeviceLastTimeMap.get(deviceId));
@@ -486,6 +488,14 @@ public class TsFileWriter implements AutoCloseable {
return groupWriter;
}
+ private void initAllSeriesWriterForAlignedSeries(
+ AlignedChunkGroupWriterImpl alignedChunkGroupWriter, IDeviceID deviceID)
throws IOException {
+ MeasurementGroup deviceSchema = schema.getSeriesSchema(new Path(deviceID));
+ for (MeasurementSchema measurementSchema :
deviceSchema.getMeasurementSchemaMap().values()) {
+ alignedChunkGroupWriter.tryToAddSeriesWriter(measurementSchema);
+ }
+ }
+
/**
* write a record in type of T.
*
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
index 5586fb22..ad6bd79d 100644
--- a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
+++ b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java
@@ -25,10 +25,13 @@ import
org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.file.MetaMarker;
import org.apache.tsfile.file.header.ChunkHeader;
import org.apache.tsfile.file.header.PageHeader;
+import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.tsfile.file.metadata.ChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.tsfile.read.TsFileDeviceIterator;
import org.apache.tsfile.read.TsFileReader;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.read.common.Chunk;
@@ -36,6 +39,7 @@ import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.read.expression.QueryExpression;
import org.apache.tsfile.read.query.dataset.QueryDataSet;
import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.TsFileGeneratorUtils;
import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
import org.apache.tsfile.write.chunk.ChunkWriterImpl;
@@ -54,6 +58,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -810,4 +815,83 @@ public class TsFileWriteApiTest {
throw throwable;
}
}
+
+ @Test
+ public void testWriteSomeColumnsOfTree() throws IOException,
WriteProcessException {
+ List<MeasurementSchema> fullMeasurementSchemas =
+ Arrays.asList(
+ new MeasurementSchema("s1", TSDataType.INT32),
+ new MeasurementSchema("s2", TSDataType.INT32),
+ new MeasurementSchema("s3", TSDataType.INT32));
+ List<MeasurementSchema> measurementSchemas1 =
+ Arrays.asList(new MeasurementSchema("s1", TSDataType.INT32));
+ String device = "root.test.d1";
+ Tablet tablet1 = new Tablet(device, fullMeasurementSchemas);
+ Tablet tablet2 = new Tablet(device, measurementSchemas1);
+ for (int i = 0; i < 1000; i++) {
+ tablet1.addTimestamp(i, i);
+ tablet1.addValue("s1", i, 1);
+ tablet1.addValue("s2", i, 1);
+ tablet1.addValue("s3", i, 1);
+ }
+ tablet1.rowSize = 1000;
+ for (int i = 0; i < 1000; i++) {
+ tablet2.addTimestamp(i, i + 1005);
+ tablet2.addValue("s1", i, 0);
+ }
+ tablet2.rowSize = 1000;
+ try (TsFileWriter writer = new TsFileWriter(f)) {
+ writer.registerAlignedTimeseries(new Path(device),
fullMeasurementSchemas);
+ writer.writeAligned(tablet1);
+ writer.flushAllChunkGroups();
+ writer.writeAligned(tablet2);
+ writer.flushAllChunkGroups();
+ }
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(f.getPath())) {
+ TsFileDeviceIterator deviceIterator =
reader.getAllDevicesIteratorWithIsAligned();
+ while (deviceIterator.hasNext()) {
+ Pair<IDeviceID, Boolean> pair = deviceIterator.next();
+ List<AlignedChunkMetadata> alignedChunkMetadataList =
+ reader.getAlignedChunkMetadataByMetadataIndexNode(
+ pair.getLeft(),
deviceIterator.getFirstMeasurementNodeOfCurrentDevice());
+ Assert.assertFalse(alignedChunkMetadataList.isEmpty());
+ Assert.assertEquals(3,
alignedChunkMetadataList.get(0).getValueChunkMetadataList().size());
+ Assert.assertEquals(
+ 1000,
+ alignedChunkMetadataList
+ .get(0)
+ .getValueChunkMetadataList()
+ .get(0)
+ .getStatistics()
+ .getCount());
+ Assert.assertEquals(
+ 1000,
+ alignedChunkMetadataList
+ .get(0)
+ .getValueChunkMetadataList()
+ .get(1)
+ .getStatistics()
+ .getCount());
+ Assert.assertEquals(
+ 1000,
+ alignedChunkMetadataList
+ .get(0)
+ .getValueChunkMetadataList()
+ .get(2)
+ .getStatistics()
+ .getCount());
+ Assert.assertEquals(3,
alignedChunkMetadataList.get(1).getValueChunkMetadataList().size());
+ Assert.assertEquals(
+ 1000,
+ alignedChunkMetadataList
+ .get(1)
+ .getValueChunkMetadataList()
+ .get(0)
+ .getStatistics()
+ .getCount());
+
Assert.assertNull(alignedChunkMetadataList.get(1).getValueChunkMetadataList().get(1));
+
Assert.assertNull(alignedChunkMetadataList.get(1).getValueChunkMetadataList().get(2));
+ }
+ }
+ }
}