This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch tsFile_v4
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/tsFile_v4 by this push:
new ab5f90e9 add aligned table write
ab5f90e9 is described below
commit ab5f90e9dc39c15b90088b3d8422d6ed12f2102d
Author: jt2594838 <[email protected]>
AuthorDate: Thu Apr 11 10:15:06 2024 +0800
add aligned table write
---
.../java/org/apache/tsfile/write/TsFileWriter.java | 23 +++++++++++++++-------
.../write/chunk/AlignedChunkGroupWriterImpl.java | 17 +++++++++++++---
2 files changed, 30 insertions(+), 10 deletions(-)
diff --git a/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
b/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
index df5d9dbd..5aacadfe 100644
--- a/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java
@@ -94,6 +94,8 @@ public class TsFileWriter implements AutoCloseable {
private long chunkGroupSizeThreshold;
+ private boolean isTableWriteAligned = true;
+
/**
* init this TsFileWriter.
*
@@ -485,8 +487,8 @@ public class TsFileWriter implements AutoCloseable {
}
private IChunkGroupWriter tryToInitialGroupWriter(IDeviceID deviceId,
boolean isAligned) {
- IChunkGroupWriter groupWriter;
- if (!groupWriters.containsKey(deviceId)) {
+ IChunkGroupWriter groupWriter = groupWriters.get(deviceId);
+ if (groupWriter == null) {
if (isAligned) {
groupWriter = new AlignedChunkGroupWriterImpl(deviceId);
if (!isUnseq) { // Sequence File
@@ -502,8 +504,6 @@ public class TsFileWriter implements AutoCloseable {
}
}
groupWriters.put(deviceId, groupWriter);
- } else {
- groupWriter = groupWriters.get(deviceId);
}
return groupWriter;
}
@@ -678,17 +678,26 @@ public class TsFileWriter implements AutoCloseable {
}
public boolean writeTable(Tablet tablet) throws IOException,
WriteProcessException {
- // make sure the ChunkGroupWriter for this Tablet exist
- checkIsTableExist(tablet);
// spilt the tablet by deviceId
final List<Pair<IDeviceID, Integer>> deviceIdEndIndexPairs =
WriteUtils.splitTabletByDevice(tablet);
+ // make sure the ChunkGroupWriter for this Tablet exist
+ checkIsTableExist(tablet);
+
int startIndex = 0;
for (Pair<IDeviceID, Integer> pair : deviceIdEndIndexPairs) {
// get corresponding ChunkGroupWriter and write this Tablet
- recordCount += groupWriters.get(pair.left).write(tablet, startIndex,
pair.right,
+ recordCount += tryToInitialGroupWriter(pair.left,
isTableWriteAligned).write(tablet, startIndex, pair.right,
tablet.getIdColumnRange(), tablet.getSchemas().size());
startIndex = pair.right;
}
return checkMemorySizeAndMayFlushChunks();
}
+
+ public boolean isTableWriteAligned() {
+ return isTableWriteAligned;
+ }
+
+ public void setTableWriteAligned(boolean tableWriteAligned) {
+ isTableWriteAligned = tableWriteAligned;
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
b/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
index f23dddbd..67552726 100644
---
a/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
+++
b/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
@@ -71,8 +71,13 @@ public class AlignedChunkGroupWriterImpl implements
IChunkGroupWriter {
@Override
public void tryToAddSeriesWriter(MeasurementSchema measurementSchema) throws
IOException {
- if
(!valueChunkWriterMap.containsKey(measurementSchema.getMeasurementId())) {
- ValueChunkWriter valueChunkWriter =
+ tryToAddSeriesWriterInternal(measurementSchema);
+ }
+
+ public ValueChunkWriter tryToAddSeriesWriterInternal(MeasurementSchema
measurementSchema) throws IOException {
+ ValueChunkWriter valueChunkWriter =
valueChunkWriterMap.get(measurementSchema.getMeasurementId());
+ if (valueChunkWriter == null) {
+ valueChunkWriter =
new ValueChunkWriter(
measurementSchema.getMeasurementId(),
measurementSchema.getCompressor(),
@@ -82,6 +87,7 @@ public class AlignedChunkGroupWriterImpl implements
IChunkGroupWriter {
valueChunkWriterMap.put(measurementSchema.getMeasurementId(),
valueChunkWriter);
tryToAddEmptyPageAndData(valueChunkWriter);
}
+ return valueChunkWriter;
}
@Override
@@ -166,6 +172,7 @@ public class AlignedChunkGroupWriterImpl implements
IChunkGroupWriter {
int pointCount = 0;
List<MeasurementSchema> measurementSchemas = tablet.getSchemas();
List<ValueChunkWriter> emptyValueChunkWriters = new ArrayList<>();
+ //TODO: should we allow duplicated measurements in a Tablet?
Set<String> existingMeasurements =
measurementSchemas.stream()
.map(MeasurementSchema::getMeasurementId)
@@ -175,6 +182,8 @@ public class AlignedChunkGroupWriterImpl implements
IChunkGroupWriter {
emptyValueChunkWriters.add(entry.getValue());
}
}
+ //TODO: changing to a column-first style by calculating the remaining page
space of each
+ // column firsts
for (int row = startRowIndex; row < endRowIndex; row++) {
long time = tablet.timestamps[row];
checkIsHistoryData(time);
@@ -185,7 +194,7 @@ public class AlignedChunkGroupWriterImpl implements
IChunkGroupWriter {
&& tablet.bitMaps[columnIndex].isMarked(row);
// check isNull by bitMap in tablet
ValueChunkWriter valueChunkWriter =
-
valueChunkWriterMap.get(measurementSchemas.get(columnIndex).getMeasurementId());
+ tryToAddSeriesWriterInternal(measurementSchemas.get(columnIndex));
switch (measurementSchemas.get(columnIndex).getType()) {
case BOOLEAN:
valueChunkWriter.write(time, ((boolean[])
tablet.values[columnIndex])[row], isNull);
@@ -212,6 +221,8 @@ public class AlignedChunkGroupWriterImpl implements
IChunkGroupWriter {
measurementSchemas.get(columnIndex).getType()));
}
}
+ // TODO: we can write the null columns after whole insertion, according
to the point number
+ // in the time chunk before and after, no need to do it in a row-by-row
manner
if (!emptyValueChunkWriters.isEmpty()) {
writeEmptyDataInOneRow(emptyValueChunkWriters);
}