This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 5905c4f9 support separated aligned chunk (#151)
5905c4f9 is described below
commit 5905c4f98d75b018ac2b4af32efece8518a0f08e
Author: shuwenwei <[email protected]>
AuthorDate: Fri Jul 5 17:51:21 2024 +0800
support separated aligned chunk (#151)
---
.../java/org/apache/tsfile/TsFileSequenceRead.java | 76 ++++++++++++++++------
.../apache/tsfile/read/TsFileSequenceReader.java | 30 +++++++--
.../tsfile/read/reader/page/AlignedPageReader.java | 8 +++
.../tsfile/write/chunk/AlignedChunkWriterImpl.java | 14 ++--
.../apache/tsfile/write/chunk/TimeChunkWriter.java | 14 ++--
5 files changed, 104 insertions(+), 38 deletions(-)
diff --git
a/java/examples/src/main/java/org/apache/tsfile/TsFileSequenceRead.java
b/java/examples/src/main/java/org/apache/tsfile/TsFileSequenceRead.java
index 5e731479..5e708f1f 100644
--- a/java/examples/src/main/java/org/apache/tsfile/TsFileSequenceRead.java
+++ b/java/examples/src/main/java/org/apache/tsfile/TsFileSequenceRead.java
@@ -35,9 +35,8 @@ import org.apache.tsfile.fileSystem.FSFactoryProducer;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.read.common.BatchData;
import org.apache.tsfile.read.reader.page.PageReader;
-import org.apache.tsfile.read.reader.page.TimePageReader;
-import org.apache.tsfile.read.reader.page.ValuePageReader;
-import org.apache.tsfile.utils.TsPrimitiveType;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -50,6 +49,7 @@ public class TsFileSequenceRead {
// if you wanna print detailed datas in pages, then turn it true.
private static boolean printDetail = false;
public static final String POINT_IN_PAGE = "\t\tpoints in the page: ";
+ private static int MASK = 0x80;
@SuppressWarnings({
"squid:S3776",
@@ -123,30 +123,64 @@ public class TsFileSequenceRead {
"\t\tCompressed page data size: " +
pageHeader.getCompressedSize());
if ((header.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
== TsFileConstant.TIME_COLUMN_MASK) { // Time Chunk
- TimePageReader timePageReader =
- new TimePageReader(pageHeader, pageData,
defaultTimeDecoder);
- timeBatch.add(timePageReader.getNextTimeBatch());
- System.out.println(POINT_IN_PAGE +
timeBatch.get(pageIndex).length);
- if (printDetail) {
- for (int i = 0; i < timeBatch.get(pageIndex).length; i++) {
- System.out.println("\t\t\ttime: " +
timeBatch.get(pageIndex)[i]);
+ Decoder decoder =
+ Decoder.getDecoderByType(header.getEncodingType(),
header.getDataType());
+ while (decoder.hasNext(pageData)) {
+ long currentTime = decoder.readLong(pageData);
+ if (printDetail) {
+ System.out.println("\t\t\ttime: " + currentTime);
}
}
} else if ((header.getChunkType() &
TsFileConstant.VALUE_COLUMN_MASK)
== TsFileConstant.VALUE_COLUMN_MASK) { // Value Chunk
- ValuePageReader valuePageReader =
- new ValuePageReader(pageHeader, pageData,
header.getDataType(), valueDecoder);
- TsPrimitiveType[] valueBatch =
- valuePageReader.nextValueBatch(timeBatch.get(pageIndex));
- if (valueBatch.length == 0) {
- System.out.println("\t\t-- Empty Page ");
- } else {
- System.out.println(POINT_IN_PAGE + valueBatch.length);
+ int pointNum = 0;
+ byte[] bitmap = null;
+ if (pageData.hasRemaining()) {
+ int size = ReadWriteIOUtils.readInt(pageData);
+ bitmap = new byte[(size + 7) / 8];
+ pageData.get(bitmap);
}
- if (printDetail) {
- for (TsPrimitiveType batch : valueBatch) {
- System.out.println("\t\t\tvalue: " + batch);
+ while (valueDecoder.hasNext(pageData)) {
+ pointNum++;
+ int idx = pointNum - 1;
+ if (((bitmap[idx / 8] & 0xFF) & (MASK >>> (idx % 8))) == 0) {
+ if (printDetail) {
+ System.out.println("\t\t\tvalue: " + null);
+ }
+ continue;
+ }
+ Object value;
+ switch (header.getDataType()) {
+ case BOOLEAN:
+ value = valueDecoder.readBoolean(pageData);
+ break;
+ case INT32:
+ value = valueDecoder.readInt(pageData);
+ break;
+ case INT64:
+ value = valueDecoder.readLong(pageData);
+ break;
+ case FLOAT:
+ value = valueDecoder.readFloat(pageData);
+ break;
+ case DOUBLE:
+ value = valueDecoder.readDouble(pageData);
+ break;
+ case TEXT:
+ value = valueDecoder.readBinary(pageData);
+ break;
+ default:
+ throw new
UnSupportedDataTypeException(String.valueOf(header.getDataType()));
}
+ if (printDetail) {
+ System.out.println("\t\t\tvalue: " + value);
+ }
+ }
+ pageData.flip();
+ if (pointNum == 0) {
+ System.out.println("\t\t-- Empty Page ");
+ } else {
+ System.out.println(POINT_IN_PAGE + pointNum);
}
} else { // NonAligned Chunk
PageReader pageReader =
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
index 7107242e..fa9b8ab3 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
@@ -1755,6 +1755,7 @@ public class TsFileSequenceReader implements
AutoCloseable {
List<long[]> timeBatch = new ArrayList<>();
IDeviceID lastDeviceId = null;
List<IMeasurementSchema> measurementSchemaList = new ArrayList<>();
+ Map<String, Integer> valueColumn2TimeBatchIndex = new HashMap<>();
try {
while ((marker = this.readMarker()) != MetaMarker.SEPARATOR) {
switch (marker) {
@@ -1778,17 +1779,24 @@ public class TsFileSequenceReader implements
AutoCloseable {
chunkHeader.getCompressionType());
measurementSchemaList.add(measurementSchema);
dataType = chunkHeader.getDataType();
- if (chunkHeader.getDataType() == TSDataType.VECTOR) {
- timeBatch.clear();
- }
+
Statistics<? extends Serializable> chunkStatistics =
Statistics.getStatsByType(dataType);
int dataSize = chunkHeader.getDataSize();
if (dataSize > 0) {
+ if (marker == MetaMarker.TIME_CHUNK_HEADER) {
+ timeBatch.add(null);
+ }
if (((byte) (chunkHeader.getChunkType() & 0x3F))
== MetaMarker
.CHUNK_HEADER) { // more than one page, we could use
page statistics to
+ if (marker == MetaMarker.VALUE_CHUNK_HEADER) {
+ int timeBatchIndex =
+
valueColumn2TimeBatchIndex.getOrDefault(chunkHeader.getMeasurementID(), 0);
+ valueColumn2TimeBatchIndex.put(
+ chunkHeader.getMeasurementID(), timeBatchIndex + 1);
+ }
// generate chunk statistic
while (dataSize > 0) {
// a new Page
@@ -1830,7 +1838,12 @@ public class TsFileSequenceReader implements
AutoCloseable {
ValuePageReader valuePageReader =
new ValuePageReader(
pageHeader, pageData, chunkHeader.getDataType(),
valueDecoder);
- TsPrimitiveType[] valueBatch =
valuePageReader.nextValueBatch(timeBatch.get(0));
+ int timeBatchIndex =
+
valueColumn2TimeBatchIndex.getOrDefault(chunkHeader.getMeasurementID(), 0);
+ valueColumn2TimeBatchIndex.put(
+ chunkHeader.getMeasurementID(), timeBatchIndex + 1);
+ TsPrimitiveType[] valueBatch =
+
valuePageReader.nextValueBatch(timeBatch.get(timeBatchIndex));
if (valueBatch != null && valueBatch.length != 0) {
for (int i = 0; i < valueBatch.length; i++) {
@@ -1838,7 +1851,7 @@ public class TsFileSequenceReader implements
AutoCloseable {
if (value == null) {
continue;
}
- long timeStamp = timeBatch.get(0)[i];
+ long timeStamp = timeBatch.get(timeBatchIndex)[i];
switch (dataType) {
case INT32:
case DATE:
@@ -1909,6 +1922,11 @@ public class TsFileSequenceReader implements
AutoCloseable {
}
chunkHeader.increasePageNums(1);
}
+ } else if (marker == MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER
+ || marker == MetaMarker.VALUE_CHUNK_HEADER) {
+ int timeBatchIndex =
+
valueColumn2TimeBatchIndex.getOrDefault(chunkHeader.getMeasurementID(), 0);
+ valueColumn2TimeBatchIndex.put(chunkHeader.getMeasurementID(),
timeBatchIndex + 1);
}
currentChunk =
new ChunkMetadata(measurementID, dataType, fileOffsetOfChunk,
chunkStatistics);
@@ -1934,6 +1952,8 @@ public class TsFileSequenceReader implements
AutoCloseable {
chunkMetadataList = new ArrayList<>();
ChunkGroupHeader chunkGroupHeader = this.readChunkGroupHeader();
lastDeviceId = chunkGroupHeader.getDeviceID();
+ timeBatch.clear();
+ valueColumn2TimeBatchIndex.clear();
break;
case MetaMarker.OPERATION_INDEX_RANGE:
truncatedSize = this.position() - 1;
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java
index 75f664f5..5307ed35 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java
@@ -469,4 +469,12 @@ public class AlignedPageReader implements IPageReader {
builder = new TsBlockBuilder((int)
timePageReader.getStatistics().getCount(), dataTypes);
}
}
+
+ public TimePageReader getTimePageReader() {
+ return timePageReader;
+ }
+
+ public List<ValuePageReader> getValuePageReaderList() {
+ return valuePageReaderList;
+ }
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
index d5fddb83..22d310c2 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
@@ -42,12 +42,14 @@ import java.util.List;
public class AlignedChunkWriterImpl implements IChunkWriter {
- private final TimeChunkWriter timeChunkWriter;
- private final List<ValueChunkWriter> valueChunkWriterList;
- private int valueIndex;
+ protected TimeChunkWriter timeChunkWriter;
+ protected List<ValueChunkWriter> valueChunkWriterList;
+ protected int valueIndex;
// Used for batch writing
- private long remainingPointsNumber;
+ protected long remainingPointsNumber;
+
+ protected AlignedChunkWriterImpl() {}
// TestOnly
public AlignedChunkWriterImpl(VectorMeasurementSchema schema) {
@@ -330,7 +332,7 @@ public class AlignedChunkWriterImpl implements IChunkWriter
{
* check occupied memory size, if it exceeds the PageSize threshold,
construct a page and put it
* to pageBuffer
*/
- private boolean checkPageSizeAndMayOpenANewPage() {
+ protected boolean checkPageSizeAndMayOpenANewPage() {
if (timeChunkWriter.checkPageSizeAndMayOpenANewPage()) {
return true;
}
@@ -342,7 +344,7 @@ public class AlignedChunkWriterImpl implements IChunkWriter
{
return false;
}
- private void writePageToPageBuffer() {
+ protected void writePageToPageBuffer() {
timeChunkWriter.writePageToPageBuffer();
for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
valueChunkWriter.writePageToPageBuffer();
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TimeChunkWriter.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TimeChunkWriter.java
index b12e2e53..756031fc 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TimeChunkWriter.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TimeChunkWriter.java
@@ -47,14 +47,14 @@ public class TimeChunkWriter {
private static final Logger logger =
LoggerFactory.getLogger(TimeChunkWriter.class);
- private final String measurementId;
+ private String measurementId;
- private final TSEncoding encodingType;
+ private TSEncoding encodingType;
- private final CompressionType compressionType;
+ private CompressionType compressionType;
/** all pages of this chunk. */
- private final PublicBAOS pageBuffer;
+ private PublicBAOS pageBuffer;
private int numOfPages;
@@ -62,9 +62,9 @@ public class TimeChunkWriter {
private TimePageWriter pageWriter;
/** page size threshold. */
- private final long pageSizeThreshold;
+ private long pageSizeThreshold;
- private final int maxNumberOfPointsInPage;
+ private int maxNumberOfPointsInPage;
/** value count in current page. */
private int valueCountInOnePageForNextCheck;
@@ -80,6 +80,8 @@ public class TimeChunkWriter {
private Statistics<?> firstPageStatistics;
+ protected TimeChunkWriter() {}
+
public TimeChunkWriter(
String measurementId,
CompressionType compressionType,