This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/develop by this push:
     new 43e81495 compaction adapting new type when table alter column type 
(#415)
43e81495 is described below

commit 43e8149512aa584c44a0c034496f291effddd2da
Author: jintao zhu <[email protected]>
AuthorDate: Fri Feb 28 13:52:06 2025 +0800

    compaction adapting new type when table alter column type (#415)
    
    * compact_alter_column_name
    
    * initial realization
    
    * rewrite Chunk to another type
    
    * delete debug output
    
    * throw exception for unsupported setNewType in AlignedChunkMetadata
    
    * spotless
    
    * code review
---
 .../metadata/AbstractAlignedChunkMetadata.java     |  12 +
 .../apache/tsfile/file/metadata/ChunkMetadata.java |  20 +
 .../tsfile/file/metadata/IChunkMetadata.java       |   4 +
 .../java/org/apache/tsfile/read/common/Chunk.java  | 183 +++++++++
 .../apache/tsfile/write/chunk/ChunkWriterImpl.java |  14 +
 .../apache/tsfile/write/chunk/TimeChunkWriter.java |  10 +
 .../tsfile/write/chunk/ValueChunkWriter.java       |  10 +
 .../org/apache/tsfile/write/ChunkRewriteTest.java  | 425 +++++++++++++++++++++
 8 files changed, 678 insertions(+)

diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AbstractAlignedChunkMetadata.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AbstractAlignedChunkMetadata.java
index 9183d690..e195b114 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AbstractAlignedChunkMetadata.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AbstractAlignedChunkMetadata.java
@@ -160,6 +160,18 @@ public abstract class AbstractAlignedChunkMetadata 
implements IChunkMetadata {
     return timeChunkMetadata.getDataType();
   }
 
+  @Override
+  public TSDataType getNewType() {
+    throw new UnsupportedOperationException(
+        "AlignedChunkMetadata doesn't support setNewType method");
+  }
+
+  @Override
+  public void setNewType(TSDataType type) {
+    throw new UnsupportedOperationException(
+        "AlignedChunkMetadata doesn't support setNewType method");
+  }
+
   @Override
   public String getMeasurementUid() {
     return timeChunkMetadata.getMeasurementUid();
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java 
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java
index 5ce097fe..653a8991 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java
@@ -58,6 +58,8 @@ public class ChunkMetadata implements IChunkMetadata {
 
   private TSDataType tsDataType;
 
+  private TSDataType newTsDataType;
+
   // the following two are not serialized and only used during write
   private TSEncoding encoding;
   private CompressionType compressionType;
@@ -168,6 +170,14 @@ public class ChunkMetadata implements IChunkMetadata {
     return tsDataType;
   }
 
+  public void setTsDataType(TSDataType tsDataType) {
+    this.tsDataType = tsDataType;
+  }
+
+  public void setStatistics(Statistics statistics) {
+    this.statistics = statistics;
+  }
+
   /**
    * serialize to outputStream.
    *
@@ -254,6 +264,16 @@ public class ChunkMetadata implements IChunkMetadata {
     this.version = version;
   }
 
+  @Override
+  public TSDataType getNewType() {
+    return newTsDataType;
+  }
+
+  @Override
+  public void setNewType(TSDataType type) {
+    this.newTsDataType = type;
+  }
+
   public List<TimeRange> getDeleteIntervalList() {
     return deleteIntervalList;
   }
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/IChunkMetadata.java 
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/IChunkMetadata.java
index 79b96494..e29e7147 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/IChunkMetadata.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/IChunkMetadata.java
@@ -57,6 +57,10 @@ public interface IChunkMetadata extends IMetadata {
 
   TSDataType getDataType();
 
+  TSDataType getNewType();
+
+  void setNewType(TSDataType newType);
+
   String getMeasurementUid();
 
   void insertIntoSortedDeletions(TimeRange timeRange);
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java 
b/java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java
index b72c0085..9cffa9d0 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java
@@ -21,15 +21,27 @@ package org.apache.tsfile.read.common;
 
 import org.apache.tsfile.encrypt.EncryptParameter;
 import org.apache.tsfile.encrypt.EncryptUtils;
+import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.MetaMarker;
 import org.apache.tsfile.file.header.ChunkHeader;
 import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.read.reader.IPageReader;
+import org.apache.tsfile.read.reader.IPointReader;
+import org.apache.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.tsfile.read.reader.chunk.TableChunkReader;
+import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.PublicBAOS;
 import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.tsfile.write.chunk.ValueChunkWriter;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 
 import static org.apache.tsfile.utils.RamUsageEstimator.sizeOfByteArray;
@@ -200,4 +212,175 @@ public class Chunk {
   public long getRetainedSizeInBytes() {
     return INSTANCE_SIZE + sizeOfByteArray(chunkData.capacity());
   }
+
+  public Chunk rewrite(TSDataType newType, Chunk timeChunk) throws IOException 
{
+    if (newType == null || newType == chunkHeader.getDataType()) {
+      return this;
+    }
+    IMeasurementSchema schema =
+        new MeasurementSchema(
+            chunkHeader.getMeasurementID(),
+            newType,
+            chunkHeader.getEncodingType(),
+            chunkHeader.getCompressionType());
+
+    ValueChunkWriter chunkWriter =
+        new ValueChunkWriter(
+            chunkHeader.getMeasurementID(),
+            chunkHeader.getCompressionType(),
+            newType,
+            chunkHeader.getEncodingType(),
+            schema.getValueEncoder(),
+            encryptParam);
+    List<Chunk> valueChunks = new ArrayList<>();
+    valueChunks.add(this);
+    TableChunkReader chunkReader = new TableChunkReader(timeChunk, 
valueChunks, null);
+    List<IPageReader> pages = chunkReader.loadPageReaderList();
+    for (IPageReader page : pages) {
+      IPointReader pointReader = 
page.getAllSatisfiedPageData().getBatchDataIterator();
+      while (pointReader.hasNextTimeValuePair()) {
+        TimeValuePair point = pointReader.nextTimeValuePair();
+        Object convertedValue = null;
+        if (point.getValue().getVector()[0] != null) {
+          convertedValue =
+              newType.castFromSingleValue(
+                  chunkHeader.getDataType(), 
point.getValue().getVector()[0].getValue());
+        }
+        long timestamp = point.getTimestamp();
+        switch (newType) {
+          case BOOLEAN:
+            chunkWriter.write(
+                timestamp,
+                convertedValue == null ? true : (boolean) convertedValue,
+                convertedValue == null);
+            break;
+          case DATE:
+          case INT32:
+            chunkWriter.write(
+                timestamp,
+                convertedValue == null ? Integer.MAX_VALUE : (int) 
convertedValue,
+                convertedValue == null);
+            break;
+          case TIMESTAMP:
+          case INT64:
+            chunkWriter.write(
+                timestamp,
+                convertedValue == null ? (long) Integer.MAX_VALUE : (long) 
convertedValue,
+                convertedValue == null);
+            break;
+          case FLOAT:
+            chunkWriter.write(
+                timestamp,
+                convertedValue == null ? (float) Integer.MAX_VALUE : (float) 
convertedValue,
+                convertedValue == null);
+            break;
+          case DOUBLE:
+            chunkWriter.write(
+                timestamp,
+                convertedValue == null ? (double) Integer.MAX_VALUE : (double) 
convertedValue,
+                convertedValue == null);
+            break;
+          case TEXT:
+          case STRING:
+          case BLOB:
+            chunkWriter.write(
+                timestamp,
+                convertedValue == null ? Binary.EMPTY_VALUE : (Binary) 
convertedValue,
+                convertedValue == null);
+            break;
+          default:
+            throw new IOException("Unsupported data type: " + newType);
+        }
+      }
+      chunkWriter.sealCurrentPage();
+    }
+    ByteBuffer newChunkData = chunkWriter.getByteBuffer();
+    ChunkHeader newChunkHeader =
+        new ChunkHeader(
+            chunkHeader.getChunkType(),
+            chunkHeader.getMeasurementID(),
+            newChunkData.capacity(),
+            newType,
+            chunkHeader.getCompressionType(),
+            chunkHeader.getEncodingType());
+    chunkData.flip();
+    timeChunk.chunkData.flip();
+    return new Chunk(
+        newChunkHeader,
+        newChunkData,
+        deleteIntervalList,
+        chunkWriter.getStatistics(),
+        encryptParam);
+  }
+
+  public Chunk rewrite(TSDataType newType) throws IOException {
+    if (newType == null || newType == chunkHeader.getDataType()) {
+      return this;
+    }
+    IMeasurementSchema schema =
+        new MeasurementSchema(
+            chunkHeader.getMeasurementID(),
+            newType,
+            chunkHeader.getEncodingType(),
+            chunkHeader.getCompressionType());
+    ChunkWriterImpl chunkWriter = new ChunkWriterImpl(schema, encryptParam);
+    ChunkReader chunkReader = new ChunkReader(this);
+    List<IPageReader> pages = chunkReader.loadPageReaderList();
+    for (IPageReader page : pages) {
+      BatchData batchData = page.getAllSatisfiedPageData();
+      IPointReader pointReader = batchData.getBatchDataIterator();
+      while (pointReader.hasNextTimeValuePair()) {
+        TimeValuePair point = pointReader.nextTimeValuePair();
+        Object convertedValue =
+            newType.castFromSingleValue(chunkHeader.getDataType(), 
point.getValue().getValue());
+        long timestamp = point.getTimestamp();
+        if (convertedValue == null) {
+          throw new IOException("NonAlignedChunk contains null, timestamp: " + 
timestamp);
+        }
+        switch (newType) {
+          case BOOLEAN:
+            chunkWriter.write(timestamp, (boolean) convertedValue);
+            break;
+          case DATE:
+          case INT32:
+            chunkWriter.write(timestamp, (int) convertedValue);
+            break;
+          case TIMESTAMP:
+          case INT64:
+            chunkWriter.write(timestamp, (long) convertedValue);
+            break;
+          case FLOAT:
+            chunkWriter.write(timestamp, (float) convertedValue);
+            break;
+          case DOUBLE:
+            chunkWriter.write(timestamp, (double) convertedValue);
+            break;
+          case TEXT:
+          case STRING:
+          case BLOB:
+            chunkWriter.write(timestamp, (Binary) convertedValue);
+            break;
+          default:
+            throw new IOException("Unsupported data type: " + newType);
+        }
+      }
+      chunkWriter.sealCurrentPage();
+    }
+    ByteBuffer newChunkData = chunkWriter.getByteBuffer();
+    ChunkHeader newChunkHeader =
+        new ChunkHeader(
+            chunkHeader.getChunkType(),
+            chunkHeader.getMeasurementID(),
+            newChunkData.capacity(),
+            newType,
+            chunkHeader.getCompressionType(),
+            chunkHeader.getEncodingType());
+    chunkData.flip();
+    return new Chunk(
+        newChunkHeader,
+        newChunkData,
+        deleteIntervalList,
+        chunkWriter.getStatistics(),
+        encryptParam);
+  }
 }
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ChunkWriterImpl.java 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ChunkWriterImpl.java
index 2fbab416..e96bee49 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ChunkWriterImpl.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ChunkWriterImpl.java
@@ -527,4 +527,18 @@ public class ChunkWriterImpl implements IChunkWriter {
   public PageWriter getPageWriter() {
     return pageWriter;
   }
+
+  public int getNumOfPages() {
+    return numOfPages;
+  }
+
+  public ByteBuffer getByteBuffer() {
+    return ByteBuffer.wrap(pageBuffer.toByteArray());
+  }
+
+  public Statistics getStatistics() {
+    Statistics copy = Statistics.getStatsByType(statistics.getType());
+    copy.mergeStatistics(statistics);
+    return copy;
+  }
 }
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TimeChunkWriter.java 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TimeChunkWriter.java
index c4181f51..7960a81a 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TimeChunkWriter.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TimeChunkWriter.java
@@ -356,4 +356,14 @@ public class TimeChunkWriter {
   public boolean checkIsUnsealedPageOverThreshold(long size, long pointNum) {
     return pageWriter.getPointNumber() >= pointNum || 
pageWriter.estimateMaxMemSize() >= size;
   }
+
+  public ByteBuffer getByteBuffer() {
+    return ByteBuffer.wrap(pageBuffer.toByteArray());
+  }
+
+  public Statistics getStatistics() {
+    Statistics copy = Statistics.getStatsByType(statistics.getType());
+    copy.mergeStatistics(statistics);
+    return copy;
+  }
 }
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ValueChunkWriter.java 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ValueChunkWriter.java
index 3a3ccfa8..f97d3370 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ValueChunkWriter.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ValueChunkWriter.java
@@ -460,4 +460,14 @@ public class ValueChunkWriter {
   public ValuePageWriter getPageWriter() {
     return pageWriter;
   }
+
+  public ByteBuffer getByteBuffer() {
+    return ByteBuffer.wrap(pageBuffer.toByteArray());
+  }
+
+  public Statistics getStatistics() {
+    Statistics copy = Statistics.getStatsByType(statistics.getType());
+    copy.mergeStatistics(statistics);
+    return copy;
+  }
 }
diff --git 
a/java/tsfile/src/test/java/org/apache/tsfile/write/ChunkRewriteTest.java 
b/java/tsfile/src/test/java/org/apache/tsfile/write/ChunkRewriteTest.java
new file mode 100644
index 00000000..90215ccf
--- /dev/null
+++ b/java/tsfile/src/test/java/org/apache/tsfile/write/ChunkRewriteTest.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tsfile.write;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.header.ChunkHeader;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.read.common.BatchData;
+import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.read.reader.IPageReader;
+import org.apache.tsfile.read.reader.IPointReader;
+import org.apache.tsfile.read.reader.chunk.AlignedChunkReader;
+import org.apache.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.tsfile.read.reader.chunk.TableChunkReader;
+import org.apache.tsfile.read.reader.page.AlignedPageReader;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.tsfile.write.chunk.TimeChunkWriter;
+import org.apache.tsfile.write.chunk.ValueChunkWriter;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.schema.VectorMeasurementSchema;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class ChunkRewriteTest {
+
+  @Test
+  public void AlignedChunkSinglePageTest() throws IOException {
+    String[] measurements = new String[] {"s1", "s2", "s3"};
+    TSDataType[] types = new TSDataType[] {TSDataType.FLOAT, TSDataType.INT32, 
TSDataType.DOUBLE};
+    VectorMeasurementSchema measurementSchema =
+        new VectorMeasurementSchema("root.sg.d1", measurements, types);
+    AlignedChunkWriterImpl chunkWriter = new 
AlignedChunkWriterImpl(measurementSchema);
+
+    for (int time = 1; time <= 20; time++) {
+      chunkWriter.write(time, (float) time, false);
+      chunkWriter.write(time, time, false);
+      chunkWriter.write(time, (double) time, false);
+      chunkWriter.write(time);
+    }
+    chunkWriter.sealCurrentPage();
+
+    TimeChunkWriter timeChunkWriter = chunkWriter.getTimeChunkWriter();
+    List<ValueChunkWriter> valueChunkWriters = 
chunkWriter.getValueChunkWriterList();
+
+    Chunk timeChunk = getTimeChunk(measurementSchema, timeChunkWriter);
+
+    List<Chunk> valueChunks = getValueChunks(valueChunkWriters);
+
+    AlignedChunkReader chunkReader = new AlignedChunkReader(timeChunk, 
valueChunks);
+    List<IPageReader> pageReaders = chunkReader.loadPageReaderList();
+    for (IPageReader page : pageReaders) {
+      IPointReader pointReader = ((AlignedPageReader) 
page).getLazyPointReader();
+      int i = 1;
+      while (pointReader.hasNextTimeValuePair()) {
+        TimeValuePair point = pointReader.nextTimeValuePair();
+        assertEquals((long) i, point.getTimestamp());
+        assertEquals((float) i, point.getValue().getVector()[0].getValue());
+        assertEquals(i, point.getValue().getVector()[1].getValue());
+        assertEquals((double) i, point.getValue().getVector()[2].getValue());
+        i++;
+      }
+    }
+    timeChunk.getData().flip();
+    valueChunks.get(0).getData().flip();
+    valueChunks.get(1).getData().flip();
+    valueChunks.get(2).getData().flip();
+    // rewrite INT32->DOUBLE
+    Chunk newValueChunk = valueChunks.get(1).rewrite(TSDataType.DOUBLE, 
timeChunk);
+    valueChunks.set(1, newValueChunk);
+    AlignedChunkReader newChunkReader = new AlignedChunkReader(timeChunk, 
valueChunks);
+    List<IPageReader> newPageReaders = newChunkReader.loadPageReaderList();
+    for (IPageReader page : newPageReaders) {
+      IPointReader pointReader = ((AlignedPageReader) 
page).getLazyPointReader();
+      int i = 1;
+      while (pointReader.hasNextTimeValuePair()) {
+        TimeValuePair point = pointReader.nextTimeValuePair();
+        assertEquals((long) i, point.getTimestamp());
+        assertEquals((float) i, point.getValue().getVector()[0].getValue());
+        assertEquals((double) i, point.getValue().getVector()[1].getValue());
+        assertEquals((double) i, point.getValue().getVector()[2].getValue());
+        i++;
+      }
+      assertEquals(20, i - 1);
+    }
+    timeChunk.getData().flip();
+    valueChunks.get(0).getData().flip();
+    valueChunks.get(1).getData().flip();
+    valueChunks.get(2).getData().flip();
+
+    //
+
+  }
+
+  @Test
+  public void AlignedChunkMultiPagesTest() throws IOException {
+    String[] measurements = new String[] {"s1", "s2", "s3"};
+    TSDataType[] types = new TSDataType[] {TSDataType.FLOAT, TSDataType.INT32, 
TSDataType.DOUBLE};
+    VectorMeasurementSchema measurementSchema =
+        new VectorMeasurementSchema("root.sg.d1", measurements, types);
+    AlignedChunkWriterImpl chunkWriter = new 
AlignedChunkWriterImpl(measurementSchema);
+
+    for (int time = 1; time <= 20; time++) {
+      chunkWriter.write(time, (float) time, false);
+      chunkWriter.write(time, time, false);
+      chunkWriter.write(time, (double) time, false);
+      chunkWriter.write(time);
+    }
+    chunkWriter.sealCurrentPage();
+
+    for (int time = 21; time <= 40; time++) {
+      chunkWriter.write(time, (float) time, false);
+      chunkWriter.write(time, time, false);
+      chunkWriter.write(time, (double) time, false);
+      chunkWriter.write(time);
+    }
+    chunkWriter.sealCurrentPage();
+
+    TimeChunkWriter timeChunkWriter = chunkWriter.getTimeChunkWriter();
+    List<ValueChunkWriter> valueChunkWriters = 
chunkWriter.getValueChunkWriterList();
+
+    Chunk timeChunk = getTimeChunk(measurementSchema, timeChunkWriter);
+    List<Chunk> valueChunks = getValueChunks(valueChunkWriters);
+
+    AlignedChunkReader chunkReader = new AlignedChunkReader(timeChunk, 
valueChunks);
+    List<IPageReader> pageReaders = chunkReader.loadPageReaderList();
+    int i = 1;
+    for (IPageReader page : pageReaders) {
+      IPointReader pointReader = ((AlignedPageReader) 
page).getLazyPointReader();
+      while (pointReader.hasNextTimeValuePair()) {
+        TimeValuePair point = pointReader.nextTimeValuePair();
+        assertEquals((long) i, point.getTimestamp());
+        assertEquals((float) i, point.getValue().getVector()[0].getValue());
+        assertEquals(i, point.getValue().getVector()[1].getValue());
+        assertEquals((double) i, point.getValue().getVector()[2].getValue());
+        i++;
+      }
+    }
+    assertEquals(40, i - 1);
+    timeChunk.getData().flip();
+    valueChunks.get(0).getData().flip();
+    valueChunks.get(1).getData().flip();
+    valueChunks.get(2).getData().flip();
+    // rewrite INT32->DOUBLE
+    Chunk newValueChunk = valueChunks.get(1).rewrite(TSDataType.DOUBLE, 
timeChunk);
+    valueChunks.set(1, newValueChunk);
+    AlignedChunkReader newChunkReader = new AlignedChunkReader(timeChunk, 
valueChunks);
+    i = 1;
+    List<IPageReader> newPageReaders = newChunkReader.loadPageReaderList();
+    for (IPageReader page : newPageReaders) {
+      IPointReader pointReader = ((AlignedPageReader) 
page).getLazyPointReader();
+      while (pointReader.hasNextTimeValuePair()) {
+        TimeValuePair point = pointReader.nextTimeValuePair();
+        assertEquals((long) i, point.getTimestamp());
+        assertEquals((float) i, point.getValue().getVector()[0].getValue());
+        assertEquals((double) i, point.getValue().getVector()[1].getValue());
+        assertEquals((double) i, point.getValue().getVector()[2].getValue());
+        i++;
+      }
+    }
+    assertEquals(40, i - 1);
+    timeChunk.getData().flip();
+    valueChunks.get(0).getData().flip();
+    valueChunks.get(1).getData().flip();
+    valueChunks.get(2).getData().flip();
+  }
+
+  @Test
+  public void AlignedChunkWithNullTest() throws IOException {
+    String[] measurements = new String[] {"s1", "s2", "s3"};
+    TSDataType[] types = new TSDataType[] {TSDataType.FLOAT, TSDataType.INT32, 
TSDataType.DOUBLE};
+    VectorMeasurementSchema measurementSchema =
+        new VectorMeasurementSchema("root.sg.d1", measurements, types);
+    AlignedChunkWriterImpl chunkWriter = new 
AlignedChunkWriterImpl(measurementSchema);
+
+    for (int time = 1; time <= 30; time = time + 3) {
+      chunkWriter.write(time, (float) time, false);
+      chunkWriter.write(time, time, false);
+      chunkWriter.write(time, (double) time, false);
+      chunkWriter.write(time);
+
+      chunkWriter.write(time + 1, (float) (time + 1), true);
+      chunkWriter.write(time + 1, time + 1, false);
+      chunkWriter.write(time + 1, (double) (time + 1), true);
+      chunkWriter.write(time + 1);
+
+      chunkWriter.write(time + 2, (float) (time + 1), true);
+      chunkWriter.write(time + 2, time + 1, true);
+      chunkWriter.write(time + 2, (double) (time + 1), true);
+      chunkWriter.write(time + 2);
+    }
+    chunkWriter.sealCurrentPage();
+
+    TimeChunkWriter timeChunkWriter = chunkWriter.getTimeChunkWriter();
+    List<ValueChunkWriter> valueChunkWriters = 
chunkWriter.getValueChunkWriterList();
+
+    Chunk timeChunk = getTimeChunk(measurementSchema, timeChunkWriter);
+    List<Chunk> valueChunks = getValueChunks(valueChunkWriters);
+
+    TableChunkReader chunkReader = new TableChunkReader(timeChunk, 
valueChunks, null);
+    List<IPageReader> pageReaders = chunkReader.loadPageReaderList();
+    int i = 1;
+    for (IPageReader page : pageReaders) {
+      IPointReader pointReader = 
page.getAllSatisfiedPageData().getBatchDataIterator();
+      while (pointReader.hasNextTimeValuePair()) {
+        TimeValuePair point = pointReader.nextTimeValuePair();
+        if (i % 3 == 1) {
+          assertEquals((long) i, point.getTimestamp());
+          assertEquals((float) i, point.getValue().getVector()[0].getValue());
+          assertEquals(i, point.getValue().getVector()[1].getValue());
+          assertEquals((double) i, point.getValue().getVector()[2].getValue());
+        } else if (i % 3 == 2) {
+          assertEquals((long) i, point.getTimestamp());
+          assertNull(point.getValue().getVector()[0]);
+          assertEquals(i, point.getValue().getVector()[1].getValue());
+          assertNull(point.getValue().getVector()[2]);
+        } else {
+          assertEquals((long) i, point.getTimestamp());
+          assertNull(point.getValue().getVector()[0]);
+          assertNull(point.getValue().getVector()[1]);
+          assertNull(point.getValue().getVector()[2]);
+        }
+        i++;
+      }
+    }
+    assertEquals(30, i - 1);
+    timeChunk.getData().flip();
+    valueChunks.get(0).getData().flip();
+    valueChunks.get(1).getData().flip();
+    valueChunks.get(2).getData().flip();
+    // rewrite INT32->DOUBLE
+    Chunk newValueChunk = valueChunks.get(1).rewrite(TSDataType.DOUBLE, 
timeChunk);
+    valueChunks.set(1, newValueChunk);
+    TableChunkReader newChunkReader = new TableChunkReader(timeChunk, 
valueChunks, null);
+    i = 1;
+    List<IPageReader> newPageReaders = newChunkReader.loadPageReaderList();
+    for (IPageReader page : newPageReaders) {
+      IPointReader pointReader = 
page.getAllSatisfiedPageData().getBatchDataIterator();
+      while (pointReader.hasNextTimeValuePair()) {
+        TimeValuePair point = pointReader.nextTimeValuePair();
+        if (i % 3 == 1) {
+          assertEquals((long) i, point.getTimestamp());
+          assertEquals((float) i, point.getValue().getVector()[0].getValue());
+          assertEquals((double) i, point.getValue().getVector()[1].getValue());
+          assertEquals((double) i, point.getValue().getVector()[2].getValue());
+        } else if (i % 3 == 2) {
+          assertEquals((long) i, point.getTimestamp());
+          assertNull(point.getValue().getVector()[0]);
+          assertEquals((double) i, point.getValue().getVector()[1].getValue());
+          assertNull(point.getValue().getVector()[2]);
+        } else {
+          assertEquals((long) i, point.getTimestamp());
+          assertNull(point.getValue().getVector()[0]);
+          assertNull(point.getValue().getVector()[1]);
+          assertNull(point.getValue().getVector()[2]);
+        }
+        i++;
+      }
+    }
+    assertEquals(30, i - 1);
+    timeChunk.getData().flip();
+    valueChunks.get(0).getData().flip();
+    valueChunks.get(1).getData().flip();
+    valueChunks.get(2).getData().flip();
+  }
+
+  @Test
+  public void NonAlignedChunkMultiPagesTest() throws IOException {
+    IMeasurementSchema schema = new MeasurementSchema("s1", TSDataType.INT32, 
TSEncoding.PLAIN);
+    ChunkWriterImpl chunkWriter = new ChunkWriterImpl(schema);
+    for (int time = 1; time <= 20; time++) {
+      chunkWriter.write(time, time);
+    }
+    chunkWriter.sealCurrentPage();
+    for (int time = 21; time <= 40; time++) {
+      chunkWriter.write(time, time);
+    }
+    chunkWriter.sealCurrentPage();
+    Chunk newChunk = getChunk(schema, chunkWriter);
+    ChunkReader chunkReader = new ChunkReader(newChunk);
+    List<IPageReader> pageReaders = chunkReader.loadPageReaderList();
+    int i = 1;
+    for (IPageReader page : pageReaders) {
+      BatchData data = page.getAllSatisfiedPageData(true);
+      IPointReader pointReader = data.getBatchDataIterator();
+      while (pointReader.hasNextTimeValuePair()) {
+        TimeValuePair point = pointReader.nextTimeValuePair();
+        assertEquals((long) i, point.getTimestamp());
+        assertEquals(i, point.getValue().getValue());
+        i++;
+      }
+    }
+    assertEquals(40, i - 1);
+    newChunk.getData().flip();
+    // rewrite INT32->DOUBLE
+    Chunk newChunk2 = newChunk.rewrite(TSDataType.DOUBLE);
+    ChunkReader chunkReader2 = new ChunkReader(newChunk2);
+    List<IPageReader> pageReaders2 = chunkReader2.loadPageReaderList();
+    i = 1;
+    for (IPageReader page : pageReaders2) {
+      BatchData data = page.getAllSatisfiedPageData(true);
+      IPointReader pointReader = data.getBatchDataIterator();
+      while (pointReader.hasNextTimeValuePair()) {
+        TimeValuePair point = pointReader.nextTimeValuePair();
+        assertEquals((long) i, point.getTimestamp());
+        assertEquals((double) i, point.getValue().getValue());
+        i++;
+      }
+    }
+  }
+
+  @Test
+  public void NonAlignedChunkSinglePageTest() throws IOException {
+    IMeasurementSchema schema = new MeasurementSchema("s1", TSDataType.INT32, 
TSEncoding.PLAIN);
+    ChunkWriterImpl chunkWriter = new ChunkWriterImpl(schema);
+    for (int time = 1; time <= 20; time++) {
+      chunkWriter.write(time, time);
+    }
+    chunkWriter.sealCurrentPage();
+
+    Chunk newChunk = getChunk(schema, chunkWriter);
+    ChunkReader chunkReader = new ChunkReader(newChunk);
+    List<IPageReader> pageReaders = chunkReader.loadPageReaderList();
+    for (IPageReader page : pageReaders) {
+      BatchData data = page.getAllSatisfiedPageData(true);
+      IPointReader pointReader = data.getBatchDataIterator();
+      int i = 1;
+      while (pointReader.hasNextTimeValuePair()) {
+        TimeValuePair point = pointReader.nextTimeValuePair();
+        assertEquals((long) i, point.getTimestamp());
+        assertEquals(i, point.getValue().getValue());
+        i++;
+      }
+      assertEquals(20, i - 1);
+    }
+    newChunk.getData().flip();
+    // rewrite FLOAT->DOUBLE
+    Chunk newChunk2 = newChunk.rewrite(TSDataType.DOUBLE);
+    ChunkReader chunkReader2 = new ChunkReader(newChunk2);
+    List<IPageReader> pageReaders2 = chunkReader2.loadPageReaderList();
+    for (IPageReader page : pageReaders2) {
+      BatchData data = page.getAllSatisfiedPageData(true);
+      IPointReader pointReader = data.getBatchDataIterator();
+      int i = 1;
+      while (pointReader.hasNextTimeValuePair()) {
+        TimeValuePair point = pointReader.nextTimeValuePair();
+        assertEquals((long) i, point.getTimestamp());
+        assertEquals((double) i, point.getValue().getValue());
+        i++;
+      }
+    }
+  }
+
+  public Chunk getTimeChunk(
+      VectorMeasurementSchema measurementSchema, TimeChunkWriter 
timeChunkWriter) {
+    ByteBuffer newChunkData = timeChunkWriter.getByteBuffer();
+    ChunkHeader newChunkHeader =
+        new ChunkHeader(
+            measurementSchema.getMeasurementName(),
+            newChunkData.capacity(),
+            TSDataType.VECTOR,
+            measurementSchema.getCompressor(),
+            measurementSchema.getTimeTSEncoding(),
+            timeChunkWriter.getNumOfPages());
+    return new Chunk(newChunkHeader, newChunkData, null, 
timeChunkWriter.getStatistics());
+  }
+
+  public List<Chunk> getValueChunks(List<ValueChunkWriter> valueChunkWriters) {
+    List<Chunk> valueChunks = new ArrayList<>();
+    for (ValueChunkWriter valueChunkWriter : valueChunkWriters) {
+      ByteBuffer valueChunkData = valueChunkWriter.getByteBuffer();
+      ChunkHeader valueChunkHeader =
+          new ChunkHeader(
+              valueChunkWriter.getMeasurementId(),
+              valueChunkData.capacity(),
+              valueChunkWriter.getDataType(),
+              valueChunkWriter.getCompressionType(),
+              valueChunkWriter.getEncodingType(),
+              valueChunkWriter.getNumOfPages());
+      Chunk valueChunk =
+          new Chunk(valueChunkHeader, valueChunkData, null, 
valueChunkWriter.getStatistics());
+      valueChunks.add(valueChunk);
+    }
+    return valueChunks;
+  }
+
+  public Chunk getChunk(IMeasurementSchema schema, ChunkWriterImpl 
chunkWriter) {
+    ByteBuffer newChunkData = chunkWriter.getByteBuffer();
+    ChunkHeader newChunkHeader =
+        new ChunkHeader(
+            schema.getMeasurementName(),
+            newChunkData.capacity(),
+            schema.getType(),
+            schema.getCompressor(),
+            schema.getEncodingType(),
+            chunkWriter.getNumOfPages());
+    return new Chunk(newChunkHeader, newChunkData, null, 
chunkWriter.getStatistics());
+  }
+}

Reply via email to