This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 43e81495 compaction adapting new type when table alter column type
(#415)
43e81495 is described below
commit 43e8149512aa584c44a0c034496f291effddd2da
Author: jintao zhu <[email protected]>
AuthorDate: Fri Feb 28 13:52:06 2025 +0800
compaction adapting new type when table alter column type (#415)
* compact_alter_column_name
* initial realization
* rewrite Chunk to another type
* delete debug output
* throw exception for unsupported setNewType in AlignedChunkMetadata
* spotless
* code review
---
.../metadata/AbstractAlignedChunkMetadata.java | 12 +
.../apache/tsfile/file/metadata/ChunkMetadata.java | 20 +
.../tsfile/file/metadata/IChunkMetadata.java | 4 +
.../java/org/apache/tsfile/read/common/Chunk.java | 183 +++++++++
.../apache/tsfile/write/chunk/ChunkWriterImpl.java | 14 +
.../apache/tsfile/write/chunk/TimeChunkWriter.java | 10 +
.../tsfile/write/chunk/ValueChunkWriter.java | 10 +
.../org/apache/tsfile/write/ChunkRewriteTest.java | 425 +++++++++++++++++++++
8 files changed, 678 insertions(+)
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AbstractAlignedChunkMetadata.java
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AbstractAlignedChunkMetadata.java
index 9183d690..e195b114 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AbstractAlignedChunkMetadata.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AbstractAlignedChunkMetadata.java
@@ -160,6 +160,18 @@ public abstract class AbstractAlignedChunkMetadata
implements IChunkMetadata {
return timeChunkMetadata.getDataType();
}
+ @Override
+ public TSDataType getNewType() {
+ throw new UnsupportedOperationException(
+ "AlignedChunkMetadata doesn't support setNewType method");
+ }
+
+ @Override
+ public void setNewType(TSDataType type) {
+ throw new UnsupportedOperationException(
+ "AlignedChunkMetadata doesn't support setNewType method");
+ }
+
@Override
public String getMeasurementUid() {
return timeChunkMetadata.getMeasurementUid();
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java
index 5ce097fe..653a8991 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java
@@ -58,6 +58,8 @@ public class ChunkMetadata implements IChunkMetadata {
private TSDataType tsDataType;
+ private TSDataType newTsDataType;
+
// the following two are not serialized and only used during write
private TSEncoding encoding;
private CompressionType compressionType;
@@ -168,6 +170,14 @@ public class ChunkMetadata implements IChunkMetadata {
return tsDataType;
}
+ public void setTsDataType(TSDataType tsDataType) {
+ this.tsDataType = tsDataType;
+ }
+
+ public void setStatistics(Statistics statistics) {
+ this.statistics = statistics;
+ }
+
/**
* serialize to outputStream.
*
@@ -254,6 +264,16 @@ public class ChunkMetadata implements IChunkMetadata {
this.version = version;
}
+ @Override
+ public TSDataType getNewType() {
+ return newTsDataType;
+ }
+
+ @Override
+ public void setNewType(TSDataType type) {
+ this.newTsDataType = type;
+ }
+
public List<TimeRange> getDeleteIntervalList() {
return deleteIntervalList;
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/IChunkMetadata.java
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/IChunkMetadata.java
index 79b96494..e29e7147 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/IChunkMetadata.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/IChunkMetadata.java
@@ -57,6 +57,10 @@ public interface IChunkMetadata extends IMetadata {
TSDataType getDataType();
+ TSDataType getNewType();
+
+ void setNewType(TSDataType newType);
+
String getMeasurementUid();
void insertIntoSortedDeletions(TimeRange timeRange);
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java
index b72c0085..9cffa9d0 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java
@@ -21,15 +21,27 @@ package org.apache.tsfile.read.common;
import org.apache.tsfile.encrypt.EncryptParameter;
import org.apache.tsfile.encrypt.EncryptUtils;
+import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.MetaMarker;
import org.apache.tsfile.file.header.ChunkHeader;
import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.read.reader.IPageReader;
+import org.apache.tsfile.read.reader.IPointReader;
+import org.apache.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.tsfile.read.reader.chunk.TableChunkReader;
+import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.tsfile.write.chunk.ValueChunkWriter;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
import static org.apache.tsfile.utils.RamUsageEstimator.sizeOfByteArray;
@@ -200,4 +212,175 @@ public class Chunk {
public long getRetainedSizeInBytes() {
return INSTANCE_SIZE + sizeOfByteArray(chunkData.capacity());
}
+
+ public Chunk rewrite(TSDataType newType, Chunk timeChunk) throws IOException
{
+ if (newType == null || newType == chunkHeader.getDataType()) {
+ return this;
+ }
+ IMeasurementSchema schema =
+ new MeasurementSchema(
+ chunkHeader.getMeasurementID(),
+ newType,
+ chunkHeader.getEncodingType(),
+ chunkHeader.getCompressionType());
+
+ ValueChunkWriter chunkWriter =
+ new ValueChunkWriter(
+ chunkHeader.getMeasurementID(),
+ chunkHeader.getCompressionType(),
+ newType,
+ chunkHeader.getEncodingType(),
+ schema.getValueEncoder(),
+ encryptParam);
+ List<Chunk> valueChunks = new ArrayList<>();
+ valueChunks.add(this);
+ TableChunkReader chunkReader = new TableChunkReader(timeChunk,
valueChunks, null);
+ List<IPageReader> pages = chunkReader.loadPageReaderList();
+ for (IPageReader page : pages) {
+ IPointReader pointReader =
page.getAllSatisfiedPageData().getBatchDataIterator();
+ while (pointReader.hasNextTimeValuePair()) {
+ TimeValuePair point = pointReader.nextTimeValuePair();
+ Object convertedValue = null;
+ if (point.getValue().getVector()[0] != null) {
+ convertedValue =
+ newType.castFromSingleValue(
+ chunkHeader.getDataType(),
point.getValue().getVector()[0].getValue());
+ }
+ long timestamp = point.getTimestamp();
+ switch (newType) {
+ case BOOLEAN:
+ chunkWriter.write(
+ timestamp,
+ convertedValue == null ? true : (boolean) convertedValue,
+ convertedValue == null);
+ break;
+ case DATE:
+ case INT32:
+ chunkWriter.write(
+ timestamp,
+ convertedValue == null ? Integer.MAX_VALUE : (int)
convertedValue,
+ convertedValue == null);
+ break;
+ case TIMESTAMP:
+ case INT64:
+ chunkWriter.write(
+ timestamp,
+ convertedValue == null ? (long) Integer.MAX_VALUE : (long)
convertedValue,
+ convertedValue == null);
+ break;
+ case FLOAT:
+ chunkWriter.write(
+ timestamp,
+ convertedValue == null ? (float) Integer.MAX_VALUE : (float)
convertedValue,
+ convertedValue == null);
+ break;
+ case DOUBLE:
+ chunkWriter.write(
+ timestamp,
+ convertedValue == null ? (double) Integer.MAX_VALUE : (double)
convertedValue,
+ convertedValue == null);
+ break;
+ case TEXT:
+ case STRING:
+ case BLOB:
+ chunkWriter.write(
+ timestamp,
+ convertedValue == null ? Binary.EMPTY_VALUE : (Binary)
convertedValue,
+ convertedValue == null);
+ break;
+ default:
+ throw new IOException("Unsupported data type: " + newType);
+ }
+ }
+ chunkWriter.sealCurrentPage();
+ }
+ ByteBuffer newChunkData = chunkWriter.getByteBuffer();
+ ChunkHeader newChunkHeader =
+ new ChunkHeader(
+ chunkHeader.getChunkType(),
+ chunkHeader.getMeasurementID(),
+ newChunkData.capacity(),
+ newType,
+ chunkHeader.getCompressionType(),
+ chunkHeader.getEncodingType());
+ chunkData.flip();
+ timeChunk.chunkData.flip();
+ return new Chunk(
+ newChunkHeader,
+ newChunkData,
+ deleteIntervalList,
+ chunkWriter.getStatistics(),
+ encryptParam);
+ }
+
+ public Chunk rewrite(TSDataType newType) throws IOException {
+ if (newType == null || newType == chunkHeader.getDataType()) {
+ return this;
+ }
+ IMeasurementSchema schema =
+ new MeasurementSchema(
+ chunkHeader.getMeasurementID(),
+ newType,
+ chunkHeader.getEncodingType(),
+ chunkHeader.getCompressionType());
+ ChunkWriterImpl chunkWriter = new ChunkWriterImpl(schema, encryptParam);
+ ChunkReader chunkReader = new ChunkReader(this);
+ List<IPageReader> pages = chunkReader.loadPageReaderList();
+ for (IPageReader page : pages) {
+ BatchData batchData = page.getAllSatisfiedPageData();
+ IPointReader pointReader = batchData.getBatchDataIterator();
+ while (pointReader.hasNextTimeValuePair()) {
+ TimeValuePair point = pointReader.nextTimeValuePair();
+ Object convertedValue =
+ newType.castFromSingleValue(chunkHeader.getDataType(),
point.getValue().getValue());
+ long timestamp = point.getTimestamp();
+ if (convertedValue == null) {
+ throw new IOException("NonAlignedChunk contains null, timestamp: " +
timestamp);
+ }
+ switch (newType) {
+ case BOOLEAN:
+ chunkWriter.write(timestamp, (boolean) convertedValue);
+ break;
+ case DATE:
+ case INT32:
+ chunkWriter.write(timestamp, (int) convertedValue);
+ break;
+ case TIMESTAMP:
+ case INT64:
+ chunkWriter.write(timestamp, (long) convertedValue);
+ break;
+ case FLOAT:
+ chunkWriter.write(timestamp, (float) convertedValue);
+ break;
+ case DOUBLE:
+ chunkWriter.write(timestamp, (double) convertedValue);
+ break;
+ case TEXT:
+ case STRING:
+ case BLOB:
+ chunkWriter.write(timestamp, (Binary) convertedValue);
+ break;
+ default:
+ throw new IOException("Unsupported data type: " + newType);
+ }
+ }
+ chunkWriter.sealCurrentPage();
+ }
+ ByteBuffer newChunkData = chunkWriter.getByteBuffer();
+ ChunkHeader newChunkHeader =
+ new ChunkHeader(
+ chunkHeader.getChunkType(),
+ chunkHeader.getMeasurementID(),
+ newChunkData.capacity(),
+ newType,
+ chunkHeader.getCompressionType(),
+ chunkHeader.getEncodingType());
+ chunkData.flip();
+ return new Chunk(
+ newChunkHeader,
+ newChunkData,
+ deleteIntervalList,
+ chunkWriter.getStatistics(),
+ encryptParam);
+ }
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ChunkWriterImpl.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ChunkWriterImpl.java
index 2fbab416..e96bee49 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ChunkWriterImpl.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ChunkWriterImpl.java
@@ -527,4 +527,18 @@ public class ChunkWriterImpl implements IChunkWriter {
public PageWriter getPageWriter() {
return pageWriter;
}
+
+ public int getNumOfPages() {
+ return numOfPages;
+ }
+
+ public ByteBuffer getByteBuffer() {
+ return ByteBuffer.wrap(pageBuffer.toByteArray());
+ }
+
+ public Statistics getStatistics() {
+ Statistics copy = Statistics.getStatsByType(statistics.getType());
+ copy.mergeStatistics(statistics);
+ return copy;
+ }
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TimeChunkWriter.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TimeChunkWriter.java
index c4181f51..7960a81a 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TimeChunkWriter.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TimeChunkWriter.java
@@ -356,4 +356,14 @@ public class TimeChunkWriter {
public boolean checkIsUnsealedPageOverThreshold(long size, long pointNum) {
return pageWriter.getPointNumber() >= pointNum ||
pageWriter.estimateMaxMemSize() >= size;
}
+
+ public ByteBuffer getByteBuffer() {
+ return ByteBuffer.wrap(pageBuffer.toByteArray());
+ }
+
+ public Statistics getStatistics() {
+ Statistics copy = Statistics.getStatsByType(statistics.getType());
+ copy.mergeStatistics(statistics);
+ return copy;
+ }
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ValueChunkWriter.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ValueChunkWriter.java
index 3a3ccfa8..f97d3370 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ValueChunkWriter.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ValueChunkWriter.java
@@ -460,4 +460,14 @@ public class ValueChunkWriter {
public ValuePageWriter getPageWriter() {
return pageWriter;
}
+
+ public ByteBuffer getByteBuffer() {
+ return ByteBuffer.wrap(pageBuffer.toByteArray());
+ }
+
+ public Statistics getStatistics() {
+ Statistics copy = Statistics.getStatsByType(statistics.getType());
+ copy.mergeStatistics(statistics);
+ return copy;
+ }
}
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/write/ChunkRewriteTest.java
b/java/tsfile/src/test/java/org/apache/tsfile/write/ChunkRewriteTest.java
new file mode 100644
index 00000000..90215ccf
--- /dev/null
+++ b/java/tsfile/src/test/java/org/apache/tsfile/write/ChunkRewriteTest.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tsfile.write;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.header.ChunkHeader;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.read.common.BatchData;
+import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.read.reader.IPageReader;
+import org.apache.tsfile.read.reader.IPointReader;
+import org.apache.tsfile.read.reader.chunk.AlignedChunkReader;
+import org.apache.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.tsfile.read.reader.chunk.TableChunkReader;
+import org.apache.tsfile.read.reader.page.AlignedPageReader;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.tsfile.write.chunk.TimeChunkWriter;
+import org.apache.tsfile.write.chunk.ValueChunkWriter;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.schema.VectorMeasurementSchema;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class ChunkRewriteTest {
+
+ @Test
+ public void AlignedChunkSinglePageTest() throws IOException {
+ String[] measurements = new String[] {"s1", "s2", "s3"};
+ TSDataType[] types = new TSDataType[] {TSDataType.FLOAT, TSDataType.INT32,
TSDataType.DOUBLE};
+ VectorMeasurementSchema measurementSchema =
+ new VectorMeasurementSchema("root.sg.d1", measurements, types);
+ AlignedChunkWriterImpl chunkWriter = new
AlignedChunkWriterImpl(measurementSchema);
+
+ for (int time = 1; time <= 20; time++) {
+ chunkWriter.write(time, (float) time, false);
+ chunkWriter.write(time, time, false);
+ chunkWriter.write(time, (double) time, false);
+ chunkWriter.write(time);
+ }
+ chunkWriter.sealCurrentPage();
+
+ TimeChunkWriter timeChunkWriter = chunkWriter.getTimeChunkWriter();
+ List<ValueChunkWriter> valueChunkWriters =
chunkWriter.getValueChunkWriterList();
+
+ Chunk timeChunk = getTimeChunk(measurementSchema, timeChunkWriter);
+
+ List<Chunk> valueChunks = getValueChunks(valueChunkWriters);
+
+ AlignedChunkReader chunkReader = new AlignedChunkReader(timeChunk,
valueChunks);
+ List<IPageReader> pageReaders = chunkReader.loadPageReaderList();
+ for (IPageReader page : pageReaders) {
+ IPointReader pointReader = ((AlignedPageReader)
page).getLazyPointReader();
+ int i = 1;
+ while (pointReader.hasNextTimeValuePair()) {
+ TimeValuePair point = pointReader.nextTimeValuePair();
+ assertEquals((long) i, point.getTimestamp());
+ assertEquals((float) i, point.getValue().getVector()[0].getValue());
+ assertEquals(i, point.getValue().getVector()[1].getValue());
+ assertEquals((double) i, point.getValue().getVector()[2].getValue());
+ i++;
+ }
+ }
+ timeChunk.getData().flip();
+ valueChunks.get(0).getData().flip();
+ valueChunks.get(1).getData().flip();
+ valueChunks.get(2).getData().flip();
+ // rewrite INT32->DOUBLE
+ Chunk newValueChunk = valueChunks.get(1).rewrite(TSDataType.DOUBLE,
timeChunk);
+ valueChunks.set(1, newValueChunk);
+ AlignedChunkReader newChunkReader = new AlignedChunkReader(timeChunk,
valueChunks);
+ List<IPageReader> newPageReaders = newChunkReader.loadPageReaderList();
+ for (IPageReader page : newPageReaders) {
+ IPointReader pointReader = ((AlignedPageReader)
page).getLazyPointReader();
+ int i = 1;
+ while (pointReader.hasNextTimeValuePair()) {
+ TimeValuePair point = pointReader.nextTimeValuePair();
+ assertEquals((long) i, point.getTimestamp());
+ assertEquals((float) i, point.getValue().getVector()[0].getValue());
+ assertEquals((double) i, point.getValue().getVector()[1].getValue());
+ assertEquals((double) i, point.getValue().getVector()[2].getValue());
+ i++;
+ }
+ assertEquals(20, i - 1);
+ }
+ timeChunk.getData().flip();
+ valueChunks.get(0).getData().flip();
+ valueChunks.get(1).getData().flip();
+ valueChunks.get(2).getData().flip();
+
+ //
+
+ }
+
+ @Test
+ public void AlignedChunkMultiPagesTest() throws IOException {
+ String[] measurements = new String[] {"s1", "s2", "s3"};
+ TSDataType[] types = new TSDataType[] {TSDataType.FLOAT, TSDataType.INT32,
TSDataType.DOUBLE};
+ VectorMeasurementSchema measurementSchema =
+ new VectorMeasurementSchema("root.sg.d1", measurements, types);
+ AlignedChunkWriterImpl chunkWriter = new
AlignedChunkWriterImpl(measurementSchema);
+
+ for (int time = 1; time <= 20; time++) {
+ chunkWriter.write(time, (float) time, false);
+ chunkWriter.write(time, time, false);
+ chunkWriter.write(time, (double) time, false);
+ chunkWriter.write(time);
+ }
+ chunkWriter.sealCurrentPage();
+
+ for (int time = 21; time <= 40; time++) {
+ chunkWriter.write(time, (float) time, false);
+ chunkWriter.write(time, time, false);
+ chunkWriter.write(time, (double) time, false);
+ chunkWriter.write(time);
+ }
+ chunkWriter.sealCurrentPage();
+
+ TimeChunkWriter timeChunkWriter = chunkWriter.getTimeChunkWriter();
+ List<ValueChunkWriter> valueChunkWriters =
chunkWriter.getValueChunkWriterList();
+
+ Chunk timeChunk = getTimeChunk(measurementSchema, timeChunkWriter);
+ List<Chunk> valueChunks = getValueChunks(valueChunkWriters);
+
+ AlignedChunkReader chunkReader = new AlignedChunkReader(timeChunk,
valueChunks);
+ List<IPageReader> pageReaders = chunkReader.loadPageReaderList();
+ int i = 1;
+ for (IPageReader page : pageReaders) {
+ IPointReader pointReader = ((AlignedPageReader)
page).getLazyPointReader();
+ while (pointReader.hasNextTimeValuePair()) {
+ TimeValuePair point = pointReader.nextTimeValuePair();
+ assertEquals((long) i, point.getTimestamp());
+ assertEquals((float) i, point.getValue().getVector()[0].getValue());
+ assertEquals(i, point.getValue().getVector()[1].getValue());
+ assertEquals((double) i, point.getValue().getVector()[2].getValue());
+ i++;
+ }
+ }
+ assertEquals(40, i - 1);
+ timeChunk.getData().flip();
+ valueChunks.get(0).getData().flip();
+ valueChunks.get(1).getData().flip();
+ valueChunks.get(2).getData().flip();
+ // rewrite INT32->DOUBLE
+ Chunk newValueChunk = valueChunks.get(1).rewrite(TSDataType.DOUBLE,
timeChunk);
+ valueChunks.set(1, newValueChunk);
+ AlignedChunkReader newChunkReader = new AlignedChunkReader(timeChunk,
valueChunks);
+ i = 1;
+ List<IPageReader> newPageReaders = newChunkReader.loadPageReaderList();
+ for (IPageReader page : newPageReaders) {
+ IPointReader pointReader = ((AlignedPageReader)
page).getLazyPointReader();
+ while (pointReader.hasNextTimeValuePair()) {
+ TimeValuePair point = pointReader.nextTimeValuePair();
+ assertEquals((long) i, point.getTimestamp());
+ assertEquals((float) i, point.getValue().getVector()[0].getValue());
+ assertEquals((double) i, point.getValue().getVector()[1].getValue());
+ assertEquals((double) i, point.getValue().getVector()[2].getValue());
+ i++;
+ }
+ }
+ assertEquals(40, i - 1);
+ timeChunk.getData().flip();
+ valueChunks.get(0).getData().flip();
+ valueChunks.get(1).getData().flip();
+ valueChunks.get(2).getData().flip();
+ }
+
+ @Test
+ public void AlignedChunkWithNullTest() throws IOException {
+ String[] measurements = new String[] {"s1", "s2", "s3"};
+ TSDataType[] types = new TSDataType[] {TSDataType.FLOAT, TSDataType.INT32,
TSDataType.DOUBLE};
+ VectorMeasurementSchema measurementSchema =
+ new VectorMeasurementSchema("root.sg.d1", measurements, types);
+ AlignedChunkWriterImpl chunkWriter = new
AlignedChunkWriterImpl(measurementSchema);
+
+ for (int time = 1; time <= 30; time = time + 3) {
+ chunkWriter.write(time, (float) time, false);
+ chunkWriter.write(time, time, false);
+ chunkWriter.write(time, (double) time, false);
+ chunkWriter.write(time);
+
+ chunkWriter.write(time + 1, (float) (time + 1), true);
+ chunkWriter.write(time + 1, time + 1, false);
+ chunkWriter.write(time + 1, (double) (time + 1), true);
+ chunkWriter.write(time + 1);
+
+ chunkWriter.write(time + 2, (float) (time + 1), true);
+ chunkWriter.write(time + 2, time + 1, true);
+ chunkWriter.write(time + 2, (double) (time + 1), true);
+ chunkWriter.write(time + 2);
+ }
+ chunkWriter.sealCurrentPage();
+
+ TimeChunkWriter timeChunkWriter = chunkWriter.getTimeChunkWriter();
+ List<ValueChunkWriter> valueChunkWriters =
chunkWriter.getValueChunkWriterList();
+
+ Chunk timeChunk = getTimeChunk(measurementSchema, timeChunkWriter);
+ List<Chunk> valueChunks = getValueChunks(valueChunkWriters);
+
+ TableChunkReader chunkReader = new TableChunkReader(timeChunk,
valueChunks, null);
+ List<IPageReader> pageReaders = chunkReader.loadPageReaderList();
+ int i = 1;
+ for (IPageReader page : pageReaders) {
+ IPointReader pointReader =
page.getAllSatisfiedPageData().getBatchDataIterator();
+ while (pointReader.hasNextTimeValuePair()) {
+ TimeValuePair point = pointReader.nextTimeValuePair();
+ if (i % 3 == 1) {
+ assertEquals((long) i, point.getTimestamp());
+ assertEquals((float) i, point.getValue().getVector()[0].getValue());
+ assertEquals(i, point.getValue().getVector()[1].getValue());
+ assertEquals((double) i, point.getValue().getVector()[2].getValue());
+ } else if (i % 3 == 2) {
+ assertEquals((long) i, point.getTimestamp());
+ assertNull(point.getValue().getVector()[0]);
+ assertEquals(i, point.getValue().getVector()[1].getValue());
+ assertNull(point.getValue().getVector()[2]);
+ } else {
+ assertEquals((long) i, point.getTimestamp());
+ assertNull(point.getValue().getVector()[0]);
+ assertNull(point.getValue().getVector()[1]);
+ assertNull(point.getValue().getVector()[2]);
+ }
+ i++;
+ }
+ }
+ assertEquals(30, i - 1);
+ timeChunk.getData().flip();
+ valueChunks.get(0).getData().flip();
+ valueChunks.get(1).getData().flip();
+ valueChunks.get(2).getData().flip();
+ // rewrite INT32->DOUBLE
+ Chunk newValueChunk = valueChunks.get(1).rewrite(TSDataType.DOUBLE,
timeChunk);
+ valueChunks.set(1, newValueChunk);
+ TableChunkReader newChunkReader = new TableChunkReader(timeChunk,
valueChunks, null);
+ i = 1;
+ List<IPageReader> newPageReaders = newChunkReader.loadPageReaderList();
+ for (IPageReader page : newPageReaders) {
+ IPointReader pointReader =
page.getAllSatisfiedPageData().getBatchDataIterator();
+ while (pointReader.hasNextTimeValuePair()) {
+ TimeValuePair point = pointReader.nextTimeValuePair();
+ if (i % 3 == 1) {
+ assertEquals((long) i, point.getTimestamp());
+ assertEquals((float) i, point.getValue().getVector()[0].getValue());
+ assertEquals((double) i, point.getValue().getVector()[1].getValue());
+ assertEquals((double) i, point.getValue().getVector()[2].getValue());
+ } else if (i % 3 == 2) {
+ assertEquals((long) i, point.getTimestamp());
+ assertNull(point.getValue().getVector()[0]);
+ assertEquals((double) i, point.getValue().getVector()[1].getValue());
+ assertNull(point.getValue().getVector()[2]);
+ } else {
+ assertEquals((long) i, point.getTimestamp());
+ assertNull(point.getValue().getVector()[0]);
+ assertNull(point.getValue().getVector()[1]);
+ assertNull(point.getValue().getVector()[2]);
+ }
+ i++;
+ }
+ }
+ assertEquals(30, i - 1);
+ timeChunk.getData().flip();
+ valueChunks.get(0).getData().flip();
+ valueChunks.get(1).getData().flip();
+ valueChunks.get(2).getData().flip();
+ }
+
+ @Test
+ public void NonAlignedChunkMultiPagesTest() throws IOException {
+ IMeasurementSchema schema = new MeasurementSchema("s1", TSDataType.INT32,
TSEncoding.PLAIN);
+ ChunkWriterImpl chunkWriter = new ChunkWriterImpl(schema);
+ for (int time = 1; time <= 20; time++) {
+ chunkWriter.write(time, time);
+ }
+ chunkWriter.sealCurrentPage();
+ for (int time = 21; time <= 40; time++) {
+ chunkWriter.write(time, time);
+ }
+ chunkWriter.sealCurrentPage();
+ Chunk newChunk = getChunk(schema, chunkWriter);
+ ChunkReader chunkReader = new ChunkReader(newChunk);
+ List<IPageReader> pageReaders = chunkReader.loadPageReaderList();
+ int i = 1;
+ for (IPageReader page : pageReaders) {
+ BatchData data = page.getAllSatisfiedPageData(true);
+ IPointReader pointReader = data.getBatchDataIterator();
+ while (pointReader.hasNextTimeValuePair()) {
+ TimeValuePair point = pointReader.nextTimeValuePair();
+ assertEquals((long) i, point.getTimestamp());
+ assertEquals(i, point.getValue().getValue());
+ i++;
+ }
+ }
+ assertEquals(40, i - 1);
+ newChunk.getData().flip();
+ // rewrite INT32->DOUBLE
+ Chunk newChunk2 = newChunk.rewrite(TSDataType.DOUBLE);
+ ChunkReader chunkReader2 = new ChunkReader(newChunk2);
+ List<IPageReader> pageReaders2 = chunkReader2.loadPageReaderList();
+ i = 1;
+ for (IPageReader page : pageReaders2) {
+ BatchData data = page.getAllSatisfiedPageData(true);
+ IPointReader pointReader = data.getBatchDataIterator();
+ while (pointReader.hasNextTimeValuePair()) {
+ TimeValuePair point = pointReader.nextTimeValuePair();
+ assertEquals((long) i, point.getTimestamp());
+ assertEquals((double) i, point.getValue().getValue());
+ i++;
+ }
+ }
+ }
+
+ @Test
+ public void NonAlignedChunkSinglePageTest() throws IOException {
+ IMeasurementSchema schema = new MeasurementSchema("s1", TSDataType.INT32,
TSEncoding.PLAIN);
+ ChunkWriterImpl chunkWriter = new ChunkWriterImpl(schema);
+ for (int time = 1; time <= 20; time++) {
+ chunkWriter.write(time, time);
+ }
+ chunkWriter.sealCurrentPage();
+
+ Chunk newChunk = getChunk(schema, chunkWriter);
+ ChunkReader chunkReader = new ChunkReader(newChunk);
+ List<IPageReader> pageReaders = chunkReader.loadPageReaderList();
+ for (IPageReader page : pageReaders) {
+ BatchData data = page.getAllSatisfiedPageData(true);
+ IPointReader pointReader = data.getBatchDataIterator();
+ int i = 1;
+ while (pointReader.hasNextTimeValuePair()) {
+ TimeValuePair point = pointReader.nextTimeValuePair();
+ assertEquals((long) i, point.getTimestamp());
+ assertEquals(i, point.getValue().getValue());
+ i++;
+ }
+ assertEquals(20, i - 1);
+ }
+ newChunk.getData().flip();
+ // rewrite FLOAT->DOUBLE
+ Chunk newChunk2 = newChunk.rewrite(TSDataType.DOUBLE);
+ ChunkReader chunkReader2 = new ChunkReader(newChunk2);
+ List<IPageReader> pageReaders2 = chunkReader2.loadPageReaderList();
+ for (IPageReader page : pageReaders2) {
+ BatchData data = page.getAllSatisfiedPageData(true);
+ IPointReader pointReader = data.getBatchDataIterator();
+ int i = 1;
+ while (pointReader.hasNextTimeValuePair()) {
+ TimeValuePair point = pointReader.nextTimeValuePair();
+ assertEquals((long) i, point.getTimestamp());
+ assertEquals((double) i, point.getValue().getValue());
+ i++;
+ }
+ }
+ }
+
+ public Chunk getTimeChunk(
+ VectorMeasurementSchema measurementSchema, TimeChunkWriter
timeChunkWriter) {
+ ByteBuffer newChunkData = timeChunkWriter.getByteBuffer();
+ ChunkHeader newChunkHeader =
+ new ChunkHeader(
+ measurementSchema.getMeasurementName(),
+ newChunkData.capacity(),
+ TSDataType.VECTOR,
+ measurementSchema.getCompressor(),
+ measurementSchema.getTimeTSEncoding(),
+ timeChunkWriter.getNumOfPages());
+ return new Chunk(newChunkHeader, newChunkData, null,
timeChunkWriter.getStatistics());
+ }
+
+ public List<Chunk> getValueChunks(List<ValueChunkWriter> valueChunkWriters) {
+ List<Chunk> valueChunks = new ArrayList<>();
+ for (ValueChunkWriter valueChunkWriter : valueChunkWriters) {
+ ByteBuffer valueChunkData = valueChunkWriter.getByteBuffer();
+ ChunkHeader valueChunkHeader =
+ new ChunkHeader(
+ valueChunkWriter.getMeasurementId(),
+ valueChunkData.capacity(),
+ valueChunkWriter.getDataType(),
+ valueChunkWriter.getCompressionType(),
+ valueChunkWriter.getEncodingType(),
+ valueChunkWriter.getNumOfPages());
+ Chunk valueChunk =
+ new Chunk(valueChunkHeader, valueChunkData, null,
valueChunkWriter.getStatistics());
+ valueChunks.add(valueChunk);
+ }
+ return valueChunks;
+ }
+
+ public Chunk getChunk(IMeasurementSchema schema, ChunkWriterImpl
chunkWriter) {
+ ByteBuffer newChunkData = chunkWriter.getByteBuffer();
+ ChunkHeader newChunkHeader =
+ new ChunkHeader(
+ schema.getMeasurementName(),
+ newChunkData.capacity(),
+ schema.getType(),
+ schema.getCompressor(),
+ schema.getEncodingType(),
+ chunkWriter.getNumOfPages());
+ return new Chunk(newChunkHeader, newChunkData, null,
chunkWriter.getStatistics());
+ }
+}